Как отправить json-сообщение из Kinesis в MSK, а затем в эластичный поиск с помощью kafka connect

Я настроил все готово, и поток тоже работает. Я отправляю свои данные из потока Kinesis в MSK с помощью лямбда-функции, и формат сообщения указан ниже.

{
        "data": {
                "RequestID":    517082653,
                "ContentTypeID":        9,
                "OrgID":        16145,
                "UserID":       4,
                "PromotionStartDateTime":       "2019-12-14T16:06:21Z",
                "PromotionEndDateTime": "2019-12-14T16:16:04Z",
                "SystemStartDatetime":  "2019-12-14T16:17:45.507000000Z"
        },
        "metadata":     {
                "timestamp":    "2019-12-29T10:37:31.502042Z",
                "record-type":  "data",
                "operation":    "insert",
                "partition-key-type":   "schema-table",
                "schema-name":  "dbo",
                "table-name":   "TRFSDIQueue"
        }
}

Это сообщение json, которое я отправляю в тему kafka, как показано ниже

props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("producer.type", "async");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            System.out.println("Inside loop successfully");
            try {
                producer.send(
                        new ProducerRecord<String, String>(topicName, new String(rec.getKinesis().getData().array())));
                Thread.sleep(1000);

                System.out.println("Message sent successfully");
            } catch (Exception e) {
                System.out.println("------------Exception message=-------------" + e.toString());
            }

            finally {
                producer.flush();
                producer.close();
            }

Когда я запускаю kafka connect для эластичного поиска, я получаю ошибку, например

DataException: Converting byte[] to Kafka Connect data failed due to serialization error

Также я изменил quickstart-elasticsearch.properties и изменил сериализатор значения ключа как строку.

Когда это был json, он выдавал ошибку.

Я вижу, что индексы создаются с именем темы kafka в эластичном поиске, но без записи.

Так что, пожалуйста, помогите мне с некоторыми затруднениями. 1. Правильно ли я отправляю сообщение из потока продюсера kinesis? Я использую

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

или я должен использовать здесь json. Но json как такового нет.

  1. Или мне нужно использовать сериализатор json в quickstart-elasticsearch.properties?

  2. Если событие вставлено, тогда оно будет вставлять запись в поиск elastci, а как насчет удаления и обновления, Kafka-connect обрабатывает удаление и обновление для нас в эластичном поиске?

Заранее спасибо


person SUDARSHAN    schedule 01.01.2020    source источник
comment
Никогда не следует спать продюсеру. Это не то, как вы проверяете доставку сообщения. Вы должны использовать producer.send().get(), чтобы сделать его блокирующим вызовом, и даже добавить OnCompleteListener, чтобы узнать, какой раздел и смещение были созданы для ... В любом случае, покажите конфигурацию вашего коннектора.   -  person OneCricketeer    schedule 01.01.2020
comment
И не могли бы вы предоставить больше информации о трассировке стека?   -  person OneCricketeer    schedule 01.01.2020


Ответы (1)


Чтобы получить 30-дневную бесплатную пробную версию, вы можете использовать Kinesis Source Connector., или вы можете узнать, как написать собственный Source Connector и развернуть его вместе с приемником Elasticsearch, вместо того, чтобы вообще использовать лямбда ...


Во-вторых, работайте в обратном направлении. Можно ли создать фальшивую тему и отправлять записи того же формата без лямбда? Они попадают в Кафку? Как насчет Elasticsearch? Также удалите Kibana из уравнения, если вы его используете, а он не работает.

Затем сконцентрируйтесь на лямбда-интеграции.


Чтобы ответить на ваши вопросы

1) Вы отправляете JSON в виде строки. Вам не нужен отдельный сериализатор для JSON, если вы не отправляете классы POJO, которые отображаются в строки JSON в интерфейсе сериализатора.

Вы отправляете записи JSON, поэтому вы должны использовать JSONConverter, в Connect, да. Однако я не думаю, что сопоставления Elasticsearch будут созданы автоматически, если вы не имеют схему и полезную нагрузку, поэтому простой обходной путь - заранее создать сопоставление индекса ES (однако, если вы уже знаете это, у вас есть разработал схему, поэтому в конечном итоге ответственность за отправку правильной записи лежит на коде производителя).

Если вы заранее определите сопоставление, вы сможете просто использовать StringConverter в Connect

Единственное, что я бы изменил в вашем коде производителя, - это количество повторных попыток выше 0. И используйте try с ресурсами вместо того, чтобы явно закрывать производителя.

//... parse input 
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
//... send record 
} 

2) Вы можете найти коннектор в проблемах Github, но в последний раз я проверял, он выполняет полные обновления и вставки документов, без частичных обновлений или каких-либо удалений.

person OneCricketeer    schedule 01.01.2020
comment
Спасибо за подробный ответ. Просто несколько пояснений, так что в основном мне нужно создать схему, а затем использовать реестр схем? Также лямбда предназначена только для целей тестирования. Мой реальный вариант использования - отправлять данные из Aurora в MSK и из MSK в эластичный поиск. Для этого снова мне нужно иметь kafka-connect-jdbc, который я уже видел в комплекте. Я пытаюсь есть, но есть некоторые пояснения. Могу ли я обновить свой вопрос или я думаю, что мне следует задать еще один? Я думаю, мне следует спросить еще один. Но большое спасибо за обучение, которое я получаю на этом форуме, и благодарен вам, особенно за то, что вы ответили на очень простые - person SUDARSHAN; 01.01.2020
comment
Вы можете использовать Avro GenericRecord вместо этого, отредактировав код производителя, конечно, но это не требуется. Не путайте наличие схемы JSON с необходимостью использования реестра схем. Я бы лично попробовал использовать Дебезиум от Авроры - person OneCricketeer; 01.01.2020
comment
Извините, я не понял вашу точку зрения. Хорошо, позвольте мне задать еще один вопрос со всеми подробностями. - person SUDARSHAN; 01.01.2020
comment
Вам не нужен Avro, но вы можете создавать продюсеров кафки, используя его. Однако затем вы должны развернуть реестр схем для хранения схем Avro. В противном случае вы можете встроить определения схемы Kafka Connect только с помощью конвертера JSON. Если у вас вообще нет схем, вы должны создать сопоставление elasticsearch вручную, чтобы правильно проиндексировать ваши данные ... Или используйте logstash kafka consumer, который знает, как лучше анализировать JSON в elasticsearch без схем подключения kafka - person OneCricketeer; 01.01.2020