Я настроил все готово, и поток тоже работает. Я отправляю свои данные из потока Kinesis в MSK с помощью лямбда-функции, и формат сообщения указан ниже.
{
"data": {
"RequestID": 517082653,
"ContentTypeID": 9,
"OrgID": 16145,
"UserID": 4,
"PromotionStartDateTime": "2019-12-14T16:06:21Z",
"PromotionEndDateTime": "2019-12-14T16:16:04Z",
"SystemStartDatetime": "2019-12-14T16:17:45.507000000Z"
},
"metadata": {
"timestamp": "2019-12-29T10:37:31.502042Z",
"record-type": "data",
"operation": "insert",
"partition-key-type": "schema-table",
"schema-name": "dbo",
"table-name": "TRFSDIQueue"
}
}
Это сообщение json, которое я отправляю в тему kafka, как показано ниже
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("producer.type", "async");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
System.out.println("Inside loop successfully");
try {
producer.send(
new ProducerRecord<String, String>(topicName, new String(rec.getKinesis().getData().array())));
Thread.sleep(1000);
System.out.println("Message sent successfully");
} catch (Exception e) {
System.out.println("------------Exception message=-------------" + e.toString());
}
finally {
producer.flush();
producer.close();
}
Когда я запускаю kafka connect для эластичного поиска, я получаю ошибку, например
DataException: Converting byte[] to Kafka Connect data failed due to serialization error
Также я изменил quickstart-elasticsearch.properties и изменил сериализатор значения ключа как строку.
Когда это был json, он выдавал ошибку.
Я вижу, что индексы создаются с именем темы kafka в эластичном поиске, но без записи.
Так что, пожалуйста, помогите мне с некоторыми затруднениями. 1. Правильно ли я отправляю сообщение из потока продюсера kinesis? Я использую
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
или я должен использовать здесь json. Но json как такового нет.
Или мне нужно использовать сериализатор json в
quickstart-elasticsearch.properties
?Если событие вставлено, тогда оно будет вставлять запись в поиск elastci, а как насчет удаления и обновления, Kafka-connect обрабатывает удаление и обновление для нас в эластичном поиске?
Заранее спасибо
producer.send().get()
, чтобы сделать его блокирующим вызовом, и даже добавить OnCompleteListener, чтобы узнать, какой раздел и смещение были созданы для ... В любом случае, покажите конфигурацию вашего коннектора. - person OneCricketeer   schedule 01.01.2020