У меня есть смешанная топология Scala, где основным рабочим является процессор PAPI, а другие части подключаются через DSL.
EventsProcessor:
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)
Данные по всем темам (включая исходный eventsTopic
) разделены через, назовем его DoubleKey
, который имеет два поля. Посетители отправляются на visitorsTopic
через приемник:
.addSink(VISITOR_SINK_NAME, visitorTopicName,
DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)
В DSL я создаю KV KTable по этой теме:
val visitorTable = builder.table(
visitorTopicName,
Consumed.`with`(DoubleKey.getKafkaSerde(),
Visitor.getKafkaSerde()),
Materialized.as(visitorStoreName))
который я позже подключаю к EventProcessor
:
topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)
Все разделяется на разделы (через DoubleKey). visitorSinkPartitioner
выполняет типичную операцию по модулю:
Math.abs(partitionKey.hashCode % numPartitions)
В обработчике событий PAPI я запрашиваю эту таблицу, чтобы узнать, есть ли уже существующие посетители.
Однако в моих тестах (с использованием EmbeddedKafka, но это не должно иметь значения) если я запустил их с одним разделом, все в порядке (EventsProcessor проверяет KTable на двух событиях на одном и том же DoubleKey
, а на втором событии - с некоторой задержкой - он может увидеть существующий Visitor
в магазине), но если я запустил его с чем выше число, обработчик событий никогда не увидит значение в Магазине.
Однако, если я проверю магазин через API (итерация store.all()
), запись будет там. Я так понимаю, что это должно быть в другом разделе.
Поскольку KTable должен работать с данными в своем разделе, и все отправляется в один и тот же раздел (с использованием явных разделителей, вызывающих один и тот же код), KTable должен получить эти данные в том же разделе.
Мои предположения верны? Что могло случиться?
KafkaStreams 1.0.0, Scala 2.12.4.
PS. Конечно, он будет работать, выполняя put
в PAPI, создавая хранилище через PAPI вместо StreamsBuilder.table()
, поскольку это определенно будет использовать тот же раздел, где выполняется код, но это исключено.