Я новичок в Кафке. Я пытаюсь отправить сообщение в тему Kafka, содержащую заголовок и полезную нагрузку.
Ниже приведена ошибка:
"org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.cabservice.request.CabLocationPayload to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer\nCaused by: java.lang.ClassCastException: class com.cabservice.request.CabLocationPayload cannot be cast to class java.lang.String
Полезная нагрузка: {header: {eventName: CAB-LOCATION, eventId: 3b1i333kiwoskl, timestamp: 1615205167470}, payload: {cabId: cc8, driverId: [email protected], geoLocation: {id: 1234, широта: 78,12, долгота: 45,23 }}}
У меня есть CabLocationPayload, в котором есть поля Header и Payload.
public class CabLocationPayload {
private Header header;
private Payload payload;
// методы получения и установки}
В контроллере
@PostMapping (value = / publish) public void sendMessageToKafkaTopic (@RequestBody CabLocationPayload cabLocationPayload) {
Заголовок и полезная нагрузка имеют поля сопоставления для Json.
После изменения VALUE_SERIALIZER_CLASS_CONFIG в Producer я могу видеть данные. Но все еще не работает с ClassCastException.
{public class KafkaConfiguration {@Bean public ProducerFactory ‹String, String› producerFactoryString () {Map ‹String, Object› configProps = new HashMap ‹› ();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
return new KafkaTemplate<>(producerFactoryString());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}}
Текущая ошибка: {2 09: 41: 20.108 INFO 22561 --- [ad | производитель-1] org.apache.kafka.clients.Metadata: [Производитель clientId = продюсер-1] Идентификатор кластера: lWghv-b_RG-_hO-qOp_cjA 2021-04-22 09: 41: 20.123 ОШИБКА 22561 --- [nio- 9080-exec-2] oaccC [. [. [/]. [DispatcherServlet]: Servlet.service () для сервлета [dispatcherServlet] в контексте с путем [] сгенерировал исключение [Ошибка обработки запроса; вложенное исключение - org.apache.kafka.common.errors.SerializationException: невозможно преобразовать значение класса com.cabservice.request.CabLocationPayload в класс org.apache.kafka.common.serialization.StringSerializer, указанный в value.serializer] с корневым причина
java.lang.ClassCastException: класс com.cabservice.request.CabLocationPayload нельзя преобразовать в класс java.lang.String (com.cabservice.request.CabLocationPayload находится в безымянном модуле загрузчика org.springframework.boot.devtools.restart.classloader. RestartClassLoader @ 1144043d; java.lang.String находится в модуле java.base загрузчика bootstrap) в org.apache.kafka.common.serialization.StringSerializer.serialize (StringSerializer.java:28) ~ [kafka-clients-2.6. 0.jar: na]}
Любая помощь очень ценится.