Я реализую IBackingMap для своей топологии Trident для хранения кортежей в ElasticSearch (я знаю, что в GitHub уже существует несколько реализаций интеграции Trident / ElasticSearch, однако я решил реализовать собственный вариант, который лучше подходит для моей задачи).
Итак, моя реализация - классическая с фабрикой:
public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {
// omitting here some other cool stuff...
private final Client client;
public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {
return new StateFactory() {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
MapState ms = OpaqueMap.build(cm);
return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
}
};
}
public ElasticSearchBackingMap(String host, int port, String clusterName) {
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName).build();
// TODO add a possibility to close the client
client = new TransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(host, port));
}
// the actual implementation is left out
}
Вы видите, что он получает имя хоста / порта / кластера в качестве входных параметров и создает клиент ElasticSearch как член класса, НО ЭТО НИКОГДА НЕ ЗАКРЫВАЕТ КЛИЕНТА.
Затем он используется изнутри топологии довольно знакомым способом:
tridentTopology.newStream("spout", spout)
// ...some processing steps here...
.groupBy(aggregationFields)
.persistentAggregate(
ElasticSearchBackingMap.getFactoryFor(
ElasticSearchConfig.ES_HOST,
ElasticSearchConfig.ES_PORT,
ElasticSearchConfig.ES_CLUSTER_NAME
),
new Fields(FieldNames.OUTCOME),
new BatchAggregator(),
new Fields(FieldNames.AGGREGATED));
Эта топология обернута в некоторую общедоступную статическую пустоту main, упакована в jar и отправлена в Storm для выполнения.
Вопрос в том, стоит ли мне беспокоиться о закрытии соединения ElasticSearch или это личное дело Storm? Если это не делает Storm, как и когда в жизненном цикле топологии я должен это делать?
Заранее спасибо!
BatchAggregator
в вашем коде. Но я бы тоже хотел увидеть решение получше ... - person dedek   schedule 23.07.2015finalize()
методы могут помочь ... Пожалуйста, опубликуйте любые рабочие решение, если вы найдете ... - person dedek   schedule 05.08.2015