Kafka Streams: смешивание и сопоставление PAPI и DSL KTable без совместного разделения

У меня есть смешанная топология Scala, где основным рабочим является процессор PAPI, а другие части подключаются через DSL.

EventsProcessor: 
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)

Данные по всем темам (включая исходный eventsTopic) разделены через, назовем его DoubleKey, который имеет два поля. Посетители отправляются на visitorsTopic через приемник:

.addSink(VISITOR_SINK_NAME, visitorTopicName,
    DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)

В DSL я создаю KV KTable по этой теме:

val visitorTable = builder.table(
  visitorTopicName,
  Consumed.`with`(DoubleKey.getKafkaSerde(),
  Visitor.getKafkaSerde()),
  Materialized.as(visitorStoreName))

который я позже подключаю к EventProcessor:

topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)

Все разделяется на разделы (через DoubleKey). visitorSinkPartitioner выполняет типичную операцию по модулю:

Math.abs(partitionKey.hashCode % numPartitions)

В обработчике событий PAPI я запрашиваю эту таблицу, чтобы узнать, есть ли уже существующие посетители.

Однако в моих тестах (с использованием EmbeddedKafka, но это не должно иметь значения) если я запустил их с одним разделом, все в порядке (EventsProcessor проверяет KTable на двух событиях на одном и том же DoubleKey, а на втором событии - с некоторой задержкой - он может увидеть существующий Visitor в магазине), но если я запустил его с чем выше число, обработчик событий никогда не увидит значение в Магазине.

Однако, если я проверю магазин через API (итерация store.all()), запись будет там. Я так понимаю, что это должно быть в другом разделе.

Поскольку KTable должен работать с данными в своем разделе, и все отправляется в один и тот же раздел (с использованием явных разделителей, вызывающих один и тот же код), KTable должен получить эти данные в том же разделе.

Мои предположения верны? Что могло случиться?

KafkaStreams 1.0.0, Scala 2.12.4.

PS. Конечно, он будет работать, выполняя put в PAPI, создавая хранилище через PAPI вместо StreamsBuilder.table(), поскольку это определенно будет использовать тот же раздел, где выполняется код, но это исключено.


person xmar    schedule 26.07.2018    source источник


Ответы (1)


Да, предположения были правильными.

Если это кому-то поможет:

У меня возникла проблема при передаче Partitioner в библиотеку Scala EmbeddedKafka. В одном из наборов тестов это было сделано некорректно. Теперь, следуя всегда здоровой практике рефакторинга, я использую этот метод во всех наборах этой топологии.

def getEmbeddedKafkaTestConfig(zkPort: Int, kafkaPort: Int) : 
    EmbeddedKafkaConfig = {
    val producerProperties = Map(ProducerConfig.PARTITIONER_CLASS_CONFIG ->
        classOf[DoubleKeyPartitioner].getCanonicalName)
    EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, 
        customProducerProperties = producerProperties)
}
person xmar    schedule 27.07.2018