Как закрыть соединение с базой данных, открытое реализацией IBackingMap в топологии Storm Trident?

Я реализую IBackingMap для своей топологии Trident для хранения кортежей в ElasticSearch (я знаю, что в GitHub уже существует несколько реализаций интеграции Trident / ElasticSearch, однако я решил реализовать собственный вариант, который лучше подходит для моей задачи).

Итак, моя реализация - классическая с фабрикой:

public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {

    // omitting here some other cool stuff...
    private final Client client;

    public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {

        return new StateFactory() {

            @Override
            public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

                ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
                CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
                MapState ms = OpaqueMap.build(cm);
                return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
            }
        };
    }

    public ElasticSearchBackingMap(String host, int port, String clusterName) {

        Settings settings = ImmutableSettings.settingsBuilder()
                .put("cluster.name", clusterName).build();

        // TODO add a possibility to close the client
        client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(host, port));
    }

    // the actual implementation is left out
}

Вы видите, что он получает имя хоста / порта / кластера в качестве входных параметров и создает клиент ElasticSearch как член класса, НО ЭТО НИКОГДА НЕ ЗАКРЫВАЕТ КЛИЕНТА.

Затем он используется изнутри топологии довольно знакомым способом:

tridentTopology.newStream("spout", spout)
            // ...some processing steps here...
            .groupBy(aggregationFields)
            .persistentAggregate(
                    ElasticSearchBackingMap.getFactoryFor(
                            ElasticSearchConfig.ES_HOST,
                            ElasticSearchConfig.ES_PORT,
                            ElasticSearchConfig.ES_CLUSTER_NAME
                    ),
                    new Fields(FieldNames.OUTCOME),
                    new BatchAggregator(),
                    new Fields(FieldNames.AGGREGATED));

Эта топология обернута в некоторую общедоступную статическую пустоту main, упакована в jar и отправлена ​​в Storm для выполнения.

Вопрос в том, стоит ли мне беспокоиться о закрытии соединения ElasticSearch или это личное дело Storm? Если это не делает Storm, как и когда в жизненном цикле топологии я должен это делать?

Заранее спасибо!


person bopcat    schedule 26.02.2015    source источник
comment
TransportClient должен быть синглтоном для каждого штормового работника. список пользователей. На самом деле, я думаю, вам не нужно закрывать Java-клиент, потому что топология шторма никогда не должна останавливаться.   -  person fhussonnois    schedule 13.05.2015
comment
Взлом может быть таким: создать синглтон для каждого рабочего, например. при создании первого состояния и закрытии этого синглтона в методе очистки вашего агрегатора - я вижу BatchAggregator в вашем коде. Но я бы тоже хотел увидеть решение получше ...   -  person dedek    schedule 23.07.2015
comment
Спасибо @dedek за взлом! Теперь я реализую состояние отправки событий в Kafka с новейшей версией Kafka Producer API, и оно должно быть закрыто, иначе есть риск потери событий. BatchAggregator в моем коде реализует ReducerAggregator, у которого нет cleanup (). Но в стандартной функции есть очистка, которая позволяет реализовать что-то вроде CloseConnectionFunction [facepalm]. Вопрос в том, вызывает ли убийство топологии триггер cleanup (), я никогда не видел, чтобы он запускался на LocalCluster ... Завтра попробую с настоящим кластер и посмотреть ...   -  person bopcat    schedule 04.08.2015
comment
@bopcat Вы, вероятно, правы, говоря о пропавшем триггере очистки. См. Это сообщение: qnalist.com/questions/5082578/ Тогда, надеюсь, ловушка отключения JVM или finalize() методы могут помочь ... Пожалуйста, опубликуйте любые рабочие решение, если вы найдете ...   -  person dedek    schedule 05.08.2015
comment
@dedek, пожалуйста, посмотрите мой ответ ниже. Конечно, не самый лучший, так что будем надеяться, что когда-нибудь STORM-49 будет реализован.   -  person bopcat    schedule 05.08.2015


Ответы (1)


Хорошо, отвечая на свой вопрос.

Прежде всего, еще раз спасибо @dedek за предложения и восстановление заявки в Storm's Jira.

Наконец, поскольку официального способа сделать это нет, я решил использовать метод cleanup () фильтра Trident. Пока что я проверил следующее (для Storm v. 0.9.4):

С LocalCluster

  • cleanup () вызывается при завершении работы кластера
  • cleanup () НЕ вызывается при уничтожении топологии, это не должно быть трагедией, очень вероятно, что LocalCluster в любом случае не будет использоваться для реальных развертываний

С настоящим кластером

  • он вызывается при уничтожении топологии, а также при остановке рабочего процесса с помощью pkill -TERM -u storm -f 'backtype.storm.daemon.worker'
  • он не вызывается, если воркер убит с помощью kill -9, или когда он выйдет из строя, или, к сожалению, когда воркер умирает из-за исключения

В целом это дает более или менее приличную гарантию вызова cleanup () при условии, что вы будете осторожны с обработкой исключений (в любом случае я обычно добавляю «грозовые ловушки» к каждому из моих примитивов Trident).

Мой код:

public class CloseFilter implements Filter {

    private static final Logger LOG = LoggerFactory.getLogger(CloseFilter.class);

    private final Closeable[] closeables;

    public CloseFilter(Closeable... closeables) {
        this.closeables = closeables;
    }

    @Override
    public boolean isKeep(TridentTuple tuple) {
        return true;
    }

    @Override
    public void prepare(Map conf, TridentOperationContext context) {

    }

    @Override
    public void cleanup() {
        for (Closeable c : closeables) {
            try {
                c.close();
            } catch (Exception e) {
                LOG.warn("Failed to close an instance of {}", c.getClass(), e);
            }
        }
    }
}

Однако было бы неплохо, если бы однажды хуки для закрытия соединений стали частью API.

person bopcat    schedule 05.08.2015