Я использую Kafka Streams 1.1.0.
Я создал следующую топологию:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000001 (topics: [configurationTopicName])
--> KTABLE-SOURCE-0000000002
Processor: KTABLE-SOURCE-0000000002 (stores: [configurationTopicName-STATE-STORE-0000000000])
--> KTABLE-MAPVALUES-0000000003
<-- KSTREAM-SOURCE-0000000001
Processor: KTABLE-MAPVALUES-0000000003 (stores: [configuration_store_application1])
--> none
<-- KTABLE-SOURCE-0000000002
Код выглядит следующим образом:
case class Test(name: String, age: Int)
val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
builder.table(configurationTopicName, Consumed.`with`(Serdes.String(), Serdes.String()))
.someAdditionalTransformation
.mapValues[Test](
new ValueMapperWithKey[String, String, Test] {
override def apply(readOnlyKey: String, value: String): Test = Test("aaa", 432)
}, mal)
Я хотел бы создать запрашиваемое хранилище, которое можно было бы использовать для запроса его позже (получение отфильтрованных / преобразованных значений).
Я провел простой тест с использованием TopologyTestDriver
, и возникло следующее исключение:
Есть идеи, почему и как это исправить?
После некоторого расследования я нашел причину вышеуказанного исключения.