Я создаю приложение Kafka Streams, которое генерирует события изменения, сравнивая каждый новый вычисленный объект с последним известным объектом.
Поэтому для каждого сообщения в теме ввода я обновляю объект в хранилище состояний и время от времени (используя пунктуацию) применяю вычисление к этому объекту и сравниваю результат с предыдущим результатом вычисления (поступающим из другого хранилища состояний). ).
Чтобы убедиться, что эта операция согласована, я делаю следующее после триггеров с пунктуацией:
- записать кортеж в государственный магазин
- сравните два значения, создайте события изменения и
context.forward
их. Так что события переходят в тему результатов. - заменить кортеж на новое_значение и записать его в хранилище состояний
Я использую этот кортеж для сценариев, в которых приложение выходит из строя или перебалансируется, поэтому я всегда могу отправить правильный набор событий, прежде чем продолжить.
Теперь я заметил, что результирующие события не всегда согласованы, особенно если приложение часто меняет баланс. Похоже, что в редких случаях приложение Kafka Streams отправляет события в тему результатов, но тема журнала изменений еще не актуальна. Другими словами, я создал что-то для темы результатов, но моя тема журнала изменений еще не в том же состоянии.
Итак, когда я делаю stateStore.put()
и вызов метода завершается успешно, есть ли какие-либо гарантии, когда он будет в теме журнала изменений?
Могу ли я принудительно очистить журнал изменений? Когда я сделаю context.commit()
, когда произойдет сброс + фиксация?