Ошибка при загрузке данных из Storm в ElasticSearch

Я использую Storm 1.1.0 и Elasticsearch 5.5. Я хочу хранить свои кортежи из Storm в ES и для этого использовал родную библиотеку Storm для Elastic.

Моя запись POM.xml:

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-elasticsearch</artifactId>
        <version>1.1.0</version>
    </dependency>       

Я инициирую свой ElasticBolt следующим образом:

EsConfig esConfig = new EsConfig("elasticseach", new String[]{"localhost:9200"});
EsTupleMapper tupleMapper = new MyMapper(); //custom mapper
EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);

Когда я запускаю свою топологию, болт хранилища выдает мне эту ошибку:

    20101 [Thread-24-EsPersistence-executor[1 1]] ERROR o.a.s.d.executor - 
org.elasticsearch.client.transport.NoNodeAvailableException: None of the configured nodes are available: []
    at org.elasticsearch.client.transport.TransportClientNodesService.ensureNodesAreAvailable(TransportClientNodesService.java:305) ~[elasticsearch-1.6.0.jar:?]
    at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:200) ~[elasticsearch-1.6.0.jar:?]
    at org.elasticsearch.client.transport.support.InternalTransportClient.execute(InternalTransportClient.java:106) ~[elasticsearch-1.6.0.jar:?]
    at org.elasticsearch.client.support.AbstractClient.index(AbstractClient.java:102) ~[elasticsearch-1.6.0.jar:?]
    at org.elasticsearch.client.transport.TransportClient.index(TransportClient.java:340) ~[elasticsearch-1.6.0.jar:?]
    at org.elasticsearch.action.index.IndexRequestBuilder.doExecute(IndexRequestBuilder.java:266) ~[elasticsearch-1.6.0.jar:?]
    at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:91) ~[elasticsearch-1.6.0.jar:?]
    at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:65) ~[elasticsearch-1.6.0.jar:?]
    at org.apache.storm.elasticsearch.bolt.EsIndexBolt.process(EsIndexBolt.java:65) [storm-elasticsearch-1.1.0.jar:1.1.0]
    at org.apache.storm.topology.base.BaseTickTupleAwareRichBolt.execute(BaseTickTupleAwareRichBolt.java:38) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.daemon.executor$fn__5044$tuple_action_fn__5046.invoke(executor.clj:727) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__4965.invoke(executor.clj:459) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.disruptor$clojure_handler$reify__4480.onEvent(disruptor.clj:40) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.daemon.executor$fn__5044$fn__5057$fn__5110.invoke(executor.clj:846) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.0.jar:1.1.0]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]

На первый взгляд может показаться, что библиотека не поддерживает Elastic 5.5 и ей нужен Elastic 1.6, но, взглянув на pom.xml библиотеки storm-elasticsearch, на самом деле они используют ES 5.X.

<properties>
    <elasticsearch.version>5.2.2</elasticsearch.version>
    <elasticsearch.test.version>2.4.4</elasticsearch.test.version>
</properties>

См. https://github.com/apache/storm/blob/master/external/storm-elasticsearch/pom.xml

Что мне не хватает в моей настройке здесь? И действительно ли storm-elasticsearch поддерживает Elastic 5.5?

Редактировать:

Из собственных журналов ES я получаю следующее сообщение об ошибке:

[2017-08-19T22:21:01,093][WARN ][o.e.t.n.Netty4Transport  ] [node-1] exception caught on transport layer [[id: 0xfe71a901, L:/10.0.4.70:9300 - R:/10.0.200.6:36344]], closing connection
java.lang.IllegalStateException: Received message from unsupported version: [1.0.0] minimal compatible version is: [5.0.0]
        at org.elasticsearch.transport.TcpTransport.messageReceived(TcpTransport.java:1379) ~[elasticsearch-5.5.0.jar:5.5.0]
        at org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:74) ~[transport-netty4-5.5.0.jar:5.5.0]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) [netty-codec-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297) [netty-codec-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:413) [netty-codec-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) [netty-codec-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.11.Final.jar:4.1.11.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) [netty-transport-4.1.11.Final.jar:4.1.11.Final]

person Mnemosyne    schedule 19.08.2017    source источник


Ответы (1)


Вместо этого используйте порт 9300 (порт управления по умолчанию). 9200 — это порт по умолчанию для вызовов REST.

person Antoniossss    schedule 19.08.2017
comment
Я внес предложенное вами изменение, но ошибка сохраняется. Совместим ли Storm с Elastic 5.X? - person Mnemosyne; 19.08.2017
comment
Это то, чего я не знаю, но ошибка исходит от базового клиента elasticsearch, поэтому я предполагаю, что это ошибка конфигурации. Мб эластик вообще не бегает. У вас настроен кластер с именем elasticsearch ??(это имя кластера по умолчанию). - person Antoniossss; 19.08.2017
comment
Привет, да, кластер работает. Постоянно проверяю здоровье. elasticsearch также является его именем, так что это тоже правильно. Что меня удивляет, так это то, почему в нем упоминается банка 1,6. Насколько это актуально? - person Mnemosyne; 19.08.2017
comment
хм, в общем, для ES версии X вы должны использовать ESClient версии X. Поскольку это разница в 5 версиях, возможно, что транспортный уровень изменился и больше не совместим. Попробуйте простой клиент ES версии 5.x на стороне. Если это работает с теми же настройками, вам не повезло. Если все еще нет - проблема с конфигурацией. - person Antoniossss; 19.08.2017
comment
Я не совсем уверен, что вы имеете в виду под «Попробуйте простой клиент ES версии 5.x». Пробовал добавить запись через командную строку, настройки те же (только порт 9200 вместо 9300) и все заработало, запись добавилась. - person Mnemosyne; 19.08.2017
comment
Проверьте официальный простой клиент ES для Java и посмотрите, сможете ли вы подключиться к 9300. elastic.co/guide/en/elasticsearch/client/java-api/current/ - person Antoniossss; 19.08.2017
comment
Я проверил журналы для Elastic, и журнал ошибок на их конце интересен: [2017-08-19T22:21:01,093][WARN ][o.e.t.n.Netty4Transport ] [node-1] exception caught on transport layer [[id: 0xfe71a901, L:/10.0.4.70:9300 - R:/10.0.200.6:36344]], closing connection java.lang.IllegalStateException: Received message from unsupported version: [1.0.0] minimal compatible version is: [5.0.0] Указывает ли это на несовместимость библиотеки? - person Mnemosyne; 19.08.2017
comment
Что это? Вы нигде этого не дописали - person Antoniossss; 19.08.2017
comment
извините, я нажал отправить по ошибке. Я вставил всю трассировку стека ошибки в сообщение. - person Mnemosyne; 20.08.2017
comment
И ты получил свой ответ. Несовместимые версии. Попробуйте включить ES transport 5.x в свой проект и исключить его из storm-elasticsearch. Может сработает, но скорее всего не сработает. В том случае, если у storm нет специальной версии для ES5, вам не повезло и это не сработает. Вам придется делать все своими руками. Клиент REST гораздо более обратно совместим, возможно, у Storm есть несколько портов для ES, которые используют REST под капотом вместо обычного транспортного клиента. - person Antoniossss; 20.08.2017
comment
Проклятие. Мне придется искать обходной путь или переключиться на hadoop-elastic. Спасибо за помощь в любом случае :) - person Mnemosyne; 20.08.2017