Почему в хранилище состояний возникает ошибка сериализации?

Я использую Kafka Streams 1.1.0.

Я создал следующую топологию:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000001 (topics: [configurationTopicName])
      --> KTABLE-SOURCE-0000000002
    Processor: KTABLE-SOURCE-0000000002 (stores: [configurationTopicName-STATE-STORE-0000000000])
      --> KTABLE-MAPVALUES-0000000003
      <-- KSTREAM-SOURCE-0000000001
    Processor: KTABLE-MAPVALUES-0000000003 (stores: [configuration_store_application1])
      --> none
      <-- KTABLE-SOURCE-0000000002

Код выглядит следующим образом:

case class Test(name: String, age: Int)
val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
  Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
builder.table(configurationTopicName, Consumed.`with`(Serdes.String(), Serdes.String()))
  .someAdditionalTransformation
  .mapValues[Test](
      new ValueMapperWithKey[String, String, Test] {
         override def apply(readOnlyKey: String, value: String): Test = Test("aaa", 432)
      }, mal)

Я хотел бы создать запрашиваемое хранилище, которое можно было бы использовать для запроса его позже (получение отфильтрованных / преобразованных значений).

Я провел простой тест с использованием TopologyTestDriver, и возникло следующее исключение:

Есть идеи, почему и как это исправить?

После некоторого расследования я нашел причину вышеуказанного исключения.


person Bartosz Wardziński    schedule 27.12.2018    source источник


Ответы (1)


Я создал Materialized для хранения данных, но я не передавал никаких Serdes для ключа или значения.

Если вы ничего не прошли, используются значения по умолчанию. В моем случае это было StringSerializer, и я пытался сериализовать объект класса Test с помощью StringSerializer mea culpa

Для прохождения Serdes нужно только добавить .withValueSerde(GenericSerde[Test]), где GenericSerdes является реализацией org.apache.kafka.common.serialization.Serde

Вызвано: java.lang.ClassCastException: com.example.kafka.streams.topology.Test не может быть приведен к java.lang.String в org.apache.kafka.common.serialization.StringSerializer.serialize (StringSerializer.java:28) в org.apache.kafka.streams.state.StateSerdes.rawValue (StateSerdes.java:178) на org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore $ 1.innerValue (MeteredKeyValueBytesStore.java:66) в org.apache.kafka.streams.state.internals. kafka.streams.state.internals.MeteredKeyValueBytesStore $ 1.innerValue (MeteredKeyValueBytesStore.java:57) в org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put (InneralueMeteredKeyValueStore.put) .state.internals.MeteredKeyValueBytesStore.put (MeteredKeyValueBytesStore.java:117) по адресу org.apache.kafka.streams.kstream.internals.KTableMapValues ​​$ KTableMapValuesProcessor.process (KTable.jpg) или потоке3. .internals.KTableMapValues ​​$ KTableMapValuesProcessor.p rocess (KTableMapValues.java:83) в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyMetricsNs ( .java: 208) в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124) в org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward (AbstractProcessorContext.java:174 ) в org.apache.kafka.streams.kstream.internals.KTableFilter $ KTableFilterProcessor.process (KTableFilter.java:89) в org.apache.kafka.streams.kstream.internals.KTableFilter $ KTableFilterFilterFilter.process (KTavaFilterProcessor.process63: ) в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:208) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:208) .apache.kafka.streams.processor.internals.ProcessorNode.process (Proc essorNode.java:124) в org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward (AbstractProcessorContext.java:174) в org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply.java (ForwardingCacheFlushListener.apply. 42) в org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward (CachingKeyValueStore.java:101) в org.apache.kafka.streams.state.internals.CachingKeyValueStore.access.access $ 000 (CachingKeyValueStore. org.apache.kafka.streams.state.internals. CachingKeyValueStore $ 1.apply (CachingKeyValueStore.java:83) на org.apache.kafka.streams.state.internals.NamedCache.flush (NamedCache.java:142) на org.apache.kafka.streams.state.internals.NamedCache.flush (NamedCache.java:100) в org.apache.kafka.streams.state.internals.ThreadCache.flush (ThreadCache.java:127) в org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush (CachingKeyValueStore.java : 123) по адресу org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush (InnerMeteredKeyValueStore.java:267) в org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.java.java: org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush (ProcessorStateManager.java:244) ... еще 58

val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
  Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
    .withValueSerde(GenericSerde[Test])
person Bartosz Wardziński    schedule 28.12.2018