Kafka Producer Timeout Проблема с подключением к концентратору сообщений в Bluemix

Я пытаюсь подключиться к концентратору сообщений IBM Bluemix и создать сообщение с использованием java, следуя примеру

https://github.com/ibm-messaging/message-hub-samples/tree/master/java/message-hub-kafka-ssl

producer.properties

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
acks=-1
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.truststore.password=changeit
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=HTTPS
ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/security/cacerts

jaas.conf.template

KafkaClient {
    com.ibm.messagehub.login.MessageHubLoginModule required
    serviceName="kafka"
    user="$USERNAME"
    password="$PASSWORD";
};

Фрагмент кода Producer.java

ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(
                "MyTopic",
                KEY.getBytes("UTF-8"),
                "MESSAGE".getBytes("UTF-8"));

        // Synchronously wait for a response from Message Hub / Kafka.
        RecordMetadata m = kafkaProducer.send(record).get();

Проблема в том, что я получаю исключение тайм-аута, когда пытаюсь получить Future RecordMetadata

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:730)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:483)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)

Прочитайте предыдущий пост на ту же тему

Время ожидания подключения к концентратору сообщений в Bluemix

И возможная упомянутая причина заключалась в том, что тема не была создана. Я вижу тему в консоли bluemix и для проверки я вызвал остальную службу, чтобы получить список тем перед отправкой сообщения.

RESTRequest restApi = new RESTRequest(getRestHost(),getApiKey());

String topics = restApi.get("/admin/topics", false);

logger.info("Topics present in the system: " + topics);

Он возвращает тему, в которой я пытаюсь отправить сообщение, но получаю сообщение об ошибке тайм-аута.

Может кто-нибудь, пожалуйста, помогите мне в отладке проблемы

ОБНОВЛЕНИЕ

Основываясь на комментариях, я включил журналы отладки для kafka, и вот последовательность журналов

2016-11-23 08:48:20.906 DEBUG 72885 --- [sage-hub-sample]    org.apache.kafka.clients.NetworkClient   : Initialize connection to node -5 for sending metadata request
2016-11-23 08:48:20.906 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.clients.NetworkClient   : Initiating connection to node -5 at kafka05-prod01.messagehub.services.us-south.bluemix.net:9093.
2016-11-23 08:48:20.914 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator      : Set SASL client state to SEND_HANDSHAKE_REQUEST
2016-11-23 08:48:20.915 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator      : Creating SaslClient: client=messagehub/[email protected];service=kafka;serviceHostname=kafka05-prod01.messagehub.services.us-south.bluemix.net;mechs=[PLAIN]
2016-11-23 08:48:20.979 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--5.bytes-sent
2016-11-23 08:48:20.980 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--5.bytes-received
2016-11-23 08:48:20.980 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--5.latency
2016-11-23 08:48:20.982 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.clients.NetworkClient   : Completed connection to node -5
2016-11-23 08:48:21.080 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator      : Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE

2016-11-23 08:48:21.264 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator      : Set SASL client state to INITIAL
2016-11-23 08:48:21.265 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator      : Set SASL client state to INTERMEDIATE
2016-11-23 08:48:21.284 DEBUG 72885 --- [sage-hub-sample]  o.apache.kafka.common.network.Selector   : Connection with kafka05-prod01.messagehub.services.us-south.bluemix.net/23.246.202.55 disconnected

java.io.EOFException: null
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:488)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:239)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:182)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:64)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:318)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
at java.lang.Thread.run(Thread.java:745)

Все вышеперечисленное регистрируется для любого из 5 брокеров. Кроме того, это все операторы отладки, поэтому я не уверен, являются ли они ошибками.

-Татха


person Tatha    schedule 22.11.2016    source источник


Ответы (2)


Я вижу пару проблем с вашим файлом JAAS:

  • Образец, на который вы ссылаетесь, использует Kafka 0.10.0.X, поэтому вам не следует использовать старый модуль входа в Message Hub для Kafka 0.9. Поэтому замените «com.ibm.messagehub.login.MessageHubLoginModule» на «org.apache.kafka.common.security.plain.PlainLoginModule».

  • Поле для имени пользователя называется «имя пользователя», а не «пользователь». Должно быть имя пользователя = "$USERNAME"

person Mickael Maison    schedule 05.12.2016
comment
Спасибо @Mickael, это были изменения, которые решили проблему. - person Tatha; 06.12.2016

пожалуйста, включите уровень log4j DEBUG, и трассировка клиента Kafka может помочь выявить любые проблемы с подключением. Попробуйте изменить последнюю строку на log4j.logger.org.apache.kafka=DEBUG в файле log4j.properties, перестройте и посмотрите на подробный вывод. Не стесняйтесь репостить это.

person Edoardo Comar    schedule 23.11.2016
comment
Спасибо, что изучили это. Я обновил Q с помощью журналов отладки apache.kafka. Похоже на проблему с сетью/подключением, как вы упомянули, но я не уверен, почему это происходит. - person Tatha; 23.11.2016