Ошибка при десериализации объекта при отправке в тему Kafka

Я новичок в Кафке. Я пытаюсь отправить сообщение в тему Kafka, содержащую заголовок и полезную нагрузку.

Ниже приведена ошибка:

"org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.cabservice.request.CabLocationPayload to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer\nCaused by: java.lang.ClassCastException: class com.cabservice.request.CabLocationPayload cannot be cast to class java.lang.String 

Полезная нагрузка: {header: {eventName: CAB-LOCATION, eventId: 3b1i333kiwoskl, timestamp: 1615205167470}, payload: {cabId: cc8, driverId: [email protected], geoLocation: {id: 1234, широта: 78,12, долгота: 45,23 }}}

У меня есть CabLocationPayload, в котором есть поля Header и Payload.

public class CabLocationPayload {

private Header header;

private Payload payload;

// методы получения и установки}

В контроллере

@PostMapping (value = / publish) public void sendMessageToKafkaTopic (@RequestBody CabLocationPayload cabLocationPayload) {

Заголовок и полезная нагрузка имеют поля сопоставления для Json.

После изменения VALUE_SERIALIZER_CLASS_CONFIG в Producer я могу видеть данные. Но все еще не работает с ClassCastException.

{public class KafkaConfiguration {@Bean public ProducerFactory ‹String, String› producerFactoryString () {Map ‹String, Object› configProps = new HashMap ‹› ();

    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
    return new KafkaTemplate<>(producerFactoryString());
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(configProps);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}

}}

Текущая ошибка: {2 09: 41: 20.108 INFO 22561 --- [ad | производитель-1] org.apache.kafka.clients.Metadata: [Производитель clientId = продюсер-1] Идентификатор кластера: lWghv-b_RG-_hO-qOp_cjA 2021-04-22 09: 41: 20.123 ОШИБКА 22561 --- [nio- 9080-exec-2] oaccC [. [. [/]. [DispatcherServlet]: Servlet.service () для сервлета [dispatcherServlet] в контексте с путем [] сгенерировал исключение [Ошибка обработки запроса; вложенное исключение - org.apache.kafka.common.errors.SerializationException: невозможно преобразовать значение класса com.cabservice.request.CabLocationPayload в класс org.apache.kafka.common.serialization.StringSerializer, указанный в value.serializer] с корневым причина

java.lang.ClassCastException: класс com.cabservice.request.CabLocationPayload нельзя преобразовать в класс java.lang.String (com.cabservice.request.CabLocationPayload находится в безымянном модуле загрузчика org.springframework.boot.devtools.restart.classloader. RestartClassLoader @ 1144043d; java.lang.String находится в модуле java.base загрузчика bootstrap) в org.apache.kafka.common.serialization.StringSerializer.serialize (StringSerializer.java:28) ~ [kafka-clients-2.6. 0.jar: na]}

Любая помощь очень ценится.


person Srikanth Sridhar    schedule 22.04.2021    source источник
comment
Вы делитесь кодами конфигурации kafka? (производительFactory, kafkaTemplate и т. д.)   -  person Süleyman Can    schedule 22.04.2021


Ответы (2)


Конфигурация Kafka 'value.serializer' должна быть подклассом Serializer, а не типом вашего объекта

Например

ключ: VALUE_SERIALIZER_CLASS_CONFIG, значение: JsonSerializer.class (источник: org.springframework.kafka.support.serializer)

Пример конфигурации производителя:

    @EnableKafka
    @Configuration
    public class KafkaProducerConfiguration {
    
        @Bean
        KafkaTemplate<String, Object> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    
        @Bean
        public ProducerFactory<String, Object> producerFactory() {
            return new DefaultKafkaProducerFactory<>(getConfig());
        }
    
        private Map<String, Object> getConfig() {
            Map<String, Object> config = new HashMap<>();
    
            config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokers");
            config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            return config;
        }
    
    }

Пример конфигурации потребителя:

Вам нужно заменить Yourclass на имя класса, который вы хотите использовать. (для этого примера: CabLocationPayload)

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfiguration {


    private Map<String, Object> consumerConfigs() {
        final Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your brokers");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumeer-group-id");
        return props;
    }

    @Bean
    public ConsumerFactory<String, YourClass> kafkaListenerConsumerFactory() {
        final ErrorHandlingDeserializer<YourClass> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(new JsonDeserializer<>(YourClass.class, false));
        return new DefaultKafkaConsumerFactory<>(this.consumerConfigs(), new StringDeserializer(), errorHandlingDeserializer);
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, YourClass> kafkaListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, YourClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(this.kafkaListenerConsumerFactory());
        return factory;
    }

}
person Süleyman Can    schedule 22.04.2021
comment
Это работает на Продюсера. Но по-прежнему не работает с ClassCastException. Не могли бы вы проверить моего потребителя, я отредактировал исходное сообщение. - person Srikanth Sridhar; 22.04.2021
comment
Я также добавил в свой ответ код конфигурации Consumer. Вопрос касался продюсера, и я думаю, что мой первый ответ сработал. Подтвердите ли вы ответ на пользу всем? - person Süleyman Can; 23.04.2021

Конфиги Kafka перенесены в application.properties

spring.kafka.consumer.bootstrap-servers: localhost: 9092 spring.kafka.consumer.group-id: group-id spring.kafka.consumer.auto-offset-reset: самый ранний spring.kafka.consumer.key-deserializer: org .apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages = *

spring.kafka.producer.bootstrap-servers: localhost: 9092 spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer: org.springframework.kafka. support.serializer.JsonSerializer

Не уверен, что spring.kafka.consumer.properties.spring.json.trusted.packages=* имеет значение.

person Srikanth Sridhar    schedule 22.04.2021