kafka.common.KafkaException: не удалось проанализировать информацию о брокере из zookeeper из EC2 для эластичного поиска

У меня установлен aws MSK, и я пытаюсь перенести записи из MSK в эластичный поиск. Я могу помещать данные в MSK в формат json. Хочу погрузиться в эластичный поиск. Я могу все настроить правильно. Это то, что я сделал на инстансе EC2

wget /usr/local http://packages.confluent.io/archive/3.1/confluent-oss-3.1.2-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-oss-3.1.2-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-3.1.2 /usr/local/confluent

/usr/local/confluent/etc/kafka-connect-elasticsearch

После этого я изменил kafka-connect-elasticsearch и установил свой URL-адрес эластичного поиска

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=AWSKafkaTutorialTopic
key.ignore=true
connection.url=https://search-abcdefg-risdfgdfgk-es-ex675zav7k6mmmqodfgdxxipg5cfsi.us-east-1.es.amazonaws.com
type.name=kafka-connect

Производитель отправляет сообщение, как показано ниже fomrat

{
        "data": {
                "RequestID":    517082653,
                "ContentTypeID":        9,
                "OrgID":        16145,
                "UserID":       4,
                "PromotionStartDateTime":       "2019-12-14T16:06:21Z",
                "PromotionEndDateTime": "2019-12-14T16:16:04Z",
                "SystemStartDatetime":  "2019-12-14T16:17:45.507000000Z"
        },
        "metadata":     {
                "timestamp":    "2019-12-29T10:37:31.502042Z",
                "record-type":  "data",
                "operation":    "insert",
                "partition-key-type":   "schema-table",
                "schema-name":  "dbo",
                "table-name":   "TRFSDIQueue"
        }
}

Я немного запутался в том, как здесь начнется подключение kafka? если да, как я могу это начать?

Я также запустил реестр схем, как показано ниже, что дало мне ошибку.

/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties

Когда я это сделаю, я получаю ошибку ниже

[2019-12-29 13:49:17,861] ERROR Server died unexpectedly:  (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"listener_security_protocol_map":{"CLIENT":"PLAINTEXT","CLIENT_SECURE":"SSL","REPLICATION":"PLAINTEXT","REPLICATION_SECURE":"SSL"},"endpoints":["CLIENT:/

Пожалуйста помоги .

Как было предложено в ответе, я обновил версию подключения kafka, но затем я начал получать ошибку ниже

 ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:63)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:61)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:72)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:39)
        at io.confluent.rest.Application.createServer(Application.java:201)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:41)
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException: Timed out trying to create or validate schema topic configuration
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:168)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:111)
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:208)
        ... 5 more
Caused by: java.util.concurrent.TimeoutException
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:161)
        ... 7 more

person Atharv Thakur    schedule 29.12.2019    source источник
comment
Возможно, вам придется вручную создать тему _schemas, или над этой ошибкой есть журналы, в которых указано, почему истекает время ожидания.   -  person OneCricketeer    schedule 29.12.2019
comment
@ cricket_007 Я просто попробую версию 5.2 kafka connect. Я пробовал с 5.1.   -  person Atharv Thakur    schedule 29.12.2019


Ответы (1)


Во-первых, Confluent Platform 3.1.2 довольно старая. Я предлагаю вам получить версию, которая соответствует версии Kafka

Вы запускаете Kafka Connect, используя соответствующие connect-* скрипты и свойства, расположенные в папках bin и etc / kafka.

Например,

/usr/local/confluent/bin/connect-standalone \
  /usr/local/confluent/etc/kafka/kafka-connect-standalone.properties \ 
  /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart.properties

Если это сработает, вы можете перейти к использованию команды подключения-распределения.

Что касается реестра схемы, вы можете найти в нем проблемы Github для нескольких людей, пытающихся заставить MSK работать, но основная проблема связана с тем, что MSK не раскрывает прослушиватель PLAINTEXT, а реестр схемы не поддерживает именованные прослушиватели. (Это могло измениться с версии 5.x)


Вы также можете попробовать использовать контейнеры Connect и Schema Registry в ECS / EKS вместо извлечения на машине EC2.

person OneCricketeer    schedule 29.12.2019
comment
Вы правы. Я обновил версию 5 и думаю, что ошибка исчезла, но я начал получать другую ошибку, например ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:63) - person Atharv Thakur; 29.12.2019
comment
Поскольку ваши данные пока находятся в формате JSON, я предлагаю пропустить реестр схем, если вас действительно интересует только коннектор Elasticsearch. Вы можете открыть отдельный пост о проблемах с реестром схем. - person OneCricketeer; 29.12.2019
comment
Но как я могу пропустить реестр схем? Это был также мой вопрос, если мои данные в формате JSON, нужен ли мне реестр схем? - person Atharv Thakur; 29.12.2019
comment
Kafka Connect с конвертером json не требует реестра схем. Свойство schemas.enable означает другое. Кроме того, в реестре схем в любом случае хранятся только схемы Avro. - person OneCricketeer; 29.12.2019
comment
Извините, я не получил нашу пинту .. Итак, что мне делать после kafka-connect-elasticsearch? Я новичок в кафке, поэтому, возможно, заранее извиняюсь за глупые вопросы. - person Atharv Thakur; 29.12.2019
comment
Вам нужно запустить Kafka Connect, и все, kafka.apache.org/documentation/#connect_running - person OneCricketeer; 29.12.2019
comment
Я еще больше запутался. Так что до сих пор все, что я делал, бесполезно. Не будет ли эта команда /usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties запускать это? - person Atharv Thakur; 29.12.2019
comment
Опять же, похоже, что вам не нужен реестр схем. Вы не используете Avro. Вместо этого вы хотите /usr/local/confluent/bin/connect-standalone /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart.properties - person OneCricketeer; 29.12.2019
comment
Итак, я не получаю никакого сообщения, но когда я запустил указанную выше команду, я получаю это INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:62) Означает ли это, что мой коннектор работает? - person Atharv Thakur; 29.12.2019
comment
Нет, я забыл привести правильные аргументы. Я обновил свой ответ. И все же в документации показано, как работают команды - person OneCricketeer; 30.12.2019