Ошибка при сериализации совокупного хранилища состояний с настраиваемым serde в Spring Cloud Stream

Я пытаюсь создать простой функциональный bean-компонент с помощью Spring Cloud Stream, который обрабатывает сообщения из KStream и GlobalKTable, присоединяется к ним, агрегирует их и выводит результат в новый поток, но у меня возникают трудности с правильной настройкой необходимых serdes. для этого.

Без лишних слов, вот мой метод:

@Bean
public BiFunction<KStream<GenericRecord, GenericRecord>, GlobalKTable<Long, GenericRecord>, KStream<String, MyCustomJavaClass>> joinAndAggregate() {

    return (stream, table) -> stream
            .join(table,
                    (streamKey, streamValue) -> (Long) streamValue.get("something"),
                    (streamValue, tableValue) -> {
                        return new MyCustomJavaClass(streamValue, tableValue);
                    }).selectKey(((key, value) -> (Long) key.get("id")))
            .groupBy((key, value) -> value.getKey(), Grouped.with(Serdes.String(), new MyCustomSerde()))
            .aggregate(() -> {
                return new MyCustomJavaClass();
            }, (key, value, aggregatedValue) -> {
                // aggregation logic
                return new MyCustomJavaClass(aggregatedData);
            }).toStream()
            .peek((k, v) -> {
                if (v == null)
                    log.warn("No value for key:\n" + k.toString() + "\n");
                else
                    log.info("Aggregated result with key:\n" + k + "\nvalue:\n" + v.toString() + "\n");
            });
}

static public final class MyCustomSerde extends JsonSerde<MyCustomJavaClass> { }

Это конфигурация в моем файле свойств:

spring.application.name: test-application
spring.cloud.stream.kafka.binder.brokers: kafka-svc:9092
spring.kafka.properties.schema.registry.url: http://schema-registry-svc:8081
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.function.definition: joinAndAggregate
spring.cloud.stream.bindings.joinAndAggregate-in-0.destination: input-stream
spring.cloud.stream.bindings.joinAndAggregate-in-1.destination: input-global-ktable
spring.cloud.stream.bindings.joinAndAggregate-out-0.destination: aggregate-output
# Serdes
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.application-id: joinAndAggregate-in-0-v0.1.0
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.key-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-1.consumer.application-id: joinAndAggregate-in-1-v0.1.0
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-1.consumer.value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-out-0.producer.value-serde: com.package.MyClass$MyCustomSerde

Когда я запускаю приведенный выше код, я получаю следующую ошибку:

Failed to process stream task 2_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_0, processor=KSTREAM-SOURCE-0000000011, topic=joinAndAggregate-in-0-v0.1.0-KSTREAM-AGGREGATE-STATE-STORE-0000000007-repartition, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: 
A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.package.model.MyCustomJavaClass). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
        at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
    ... <omitting some lines here> ...
Caused by: java.lang.ClassCastException: class com.package.model.MyCustomJavaClass cannot be cast to class [B (com.package.model.MyCustomJavaClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)

Класс com.package.model.MyCustomJavaClass находится в другом пакете, чем MyClass, в котором определен метод функционального потока. Может ли это быть проблема?

Я также подтвердил, что MyCustomJavaClass можно сериализовать и десериализовать должным образом, используя пользовательский serde, который вы видите выше (MyCustomSerde). это просто serde, расширяющий JsonSerde. Я могу обрабатывать сообщения со значениями, сериализованными с помощью MyCustomSerde, как на входе, так и на выходе с помощью других функциональных методов, которые я здесь опустил, поэтому сериализатор и пользовательский класс java, который я использую, не являются проблемой. Почему-то только поток агрегированного хранилища состояний имеет проблемы с моим пользовательским serde, и я не могу найти способ исправить это, просмотрев примеры и документацию.

Что я делаю не так?

Заранее спасибо!


person Roberto Francescangeli    schedule 20.05.2020    source источник


Ответы (1)


Когда вы видите такую ​​ошибку:

 serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.package.model.MyCustomJavaClass). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

Это означает, что Kafka Streams использовал (де) сериализатор, который не соответствовал представленным типам. В этом случае Kafka Streams использовал сериализатор по умолчанию Serdes.ByteArraySerde. Если вы обновите свой aggregate метод и добавите третий параметр Materialized.with(Serdes.String(), new MyCustomSerde()), ваше приложение должно избавиться от этой ошибки.


 .aggregate(() -> {
                return new MyCustomJavaClass();
            }, (key, value, aggregatedValue) -> {
                // aggregation logic
                return new MyCustomJavaClass(aggregatedData);
            }, Materialized.with(Serdes.String(), new MyCustomSerde()))

Дай мне знать, как дела.

-Билл

person bbejeck    schedule 21.05.2020
comment
Это сработало! Я думал, что ошибка произошла из-за вызова groupBy (), потому что это произошло в теме перераспределения хранилища состояний, и поэтому я добавил вызов Grouped.with (), но вы правы, это был вызов aggregate ()! Спасибо! - person Roberto Francescangeli; 22.05.2020