Какие гарантии предоставляет Kafka Stream при использовании хранилища состояний RocksDb с журналом изменений?

Я создаю приложение Kafka Streams, которое генерирует события изменения, сравнивая каждый новый вычисленный объект с последним известным объектом.

Поэтому для каждого сообщения в теме ввода я обновляю объект в хранилище состояний и время от времени (используя пунктуацию) применяю вычисление к этому объекту и сравниваю результат с предыдущим результатом вычисления (поступающим из другого хранилища состояний). ).

Чтобы убедиться, что эта операция согласована, я делаю следующее после триггеров с пунктуацией:

  1. записать кортеж в государственный магазин
  2. сравните два значения, создайте события изменения и context.forward их. Так что события переходят в тему результатов.
  3. заменить кортеж на новое_значение и записать его в хранилище состояний

Я использую этот кортеж для сценариев, в которых приложение выходит из строя или перебалансируется, поэтому я всегда могу отправить правильный набор событий, прежде чем продолжить.

Теперь я заметил, что результирующие события не всегда согласованы, особенно если приложение часто меняет баланс. Похоже, что в редких случаях приложение Kafka Streams отправляет события в тему результатов, но тема журнала изменений еще не актуальна. Другими словами, я создал что-то для темы результатов, но моя тема журнала изменений еще не в том же состоянии.

Итак, когда я делаю stateStore.put() и вызов метода завершается успешно, есть ли какие-либо гарантии, когда он будет в теме журнала изменений?

Могу ли я принудительно очистить журнал изменений? Когда я сделаю context.commit(), когда произойдет сброс + фиксация?

процесс


person Tim Van Laer    schedule 31.08.2018    source источник


Ответы (1)


Чтобы получить полную согласованность, вам нужно будет включить processing.guarantee="exaclty_once", иначе при потенциальной ошибке вы можете получить противоречивые результаты.

Если вы хотите остаться с «at_least_once», вы можете использовать одно хранилище и обновить хранилище после обработки (т. Е. После вызова forward()). Это минимизировало временное окно, чтобы получить несоответствия.

И да, если вы вызываете context.commit(), до того, как смещения входной темы будут зафиксированы, все хранилища будут сброшены на диск, и все ожидающие записи производителя также будут сброшены.

person Matthias J. Sax    schedule 02.09.2018
comment
Очень интересно. Я понимаю, что context.commit() запрашивает только фиксацию. Не могли бы вы уточнить, когда именно это произойдет? В настоящее время я делаю context.commit() для каждого сообщения, но, похоже, это не позволяет эффективно фиксировать каждое сообщение ... - person Tim Van Laer; 12.09.2018
comment
После прочтения stackoverflow.com/questions / 50312386 / Я понимаю, какое влияние на эффективность оказывает фиксация каждого сообщения :-) Тем не менее, мне очень любопытно, как и когда внутренние компоненты решают совершить фиксацию. - person Tim Van Laer; 12.09.2018
comment
Это деталь реализации. По сути, есть цикл для обработки последовательных записей. Время от времени мы прерываем этот цикл, чтобы проверить, запрашивал ли пользователь фиксацию, и если да, то выполняем ее. Если вы хотите узнать подробности, вам нужно заглянуть в код. Обратите внимание, что это деталь реализации, и дизайн сделан для более эффективной работы. Кроме того, он меняется между версиями. Текущая реализация trunk: github.com/apache/kafka/blob/trunk/streams/src/main/java/org/ - person Matthias J. Sax; 12.09.2018
comment
Спасибо, Матиас! Очень признателен. - person Tim Van Laer; 13.09.2018