Я работаю над потоком API Mule, тестируя потоки событий Salesforce. У меня настроен коннектор, и я подписан на канал потоковой передачи.
Это отлично работает, когда я создаю / обновляю / удаляю записи контактов, события проходят, и я обрабатываю их, добавляя их в другую базу данных.
Меня немного смущает функция replayId
. С текущими настройками я могу закрыть приложение Mule, создать контакты в организации, а затем, когда я верну приложение в онлайн, оно возобновится, добавив данные с того места, где оно было остановлено. Идеально.
Однако я пытаюсь смоделировать, что произойдет, если приложение mule выйдет из строя во время обработки событий.
Я запустил APEX, чтобы создать 100 случайных записей контактов. Как только я вижу, что он регистрирует первый поток в моем приложении, я убиваю приложение mule. Мое предположение заключалось в том, что он будет знать, где остановился, когда я возобновлю приложение, как если бы оно было отключено до создания контакта, как в предыдущем тесте.
Я заметил, что он обрабатывает только несколько контактов, которые прошли, прежде чем я закрою приложение.
Похоже, что события могут поступать так быстро во входном потоке, что они уже достигли последнего replayId
в потоке. Однако, поскольку эти записи все еще не были добавлены в мою внешнюю базу данных, я теряю эти записи. Поток сделал то, что должен был сделать, но из-за того, что приложение все еще обрабатывает пакет, мои 100 записей не фиксируются, как это отражает replayId
.
Как я могу подойти к этому, чтобы не потерять данные в случае большого потока данных до сбоя приложения? Я помню, что с Kafka вы должны были commit
идентификатор после того, как он был вставлен в базу данных, чтобы он знал, что последний из них вы официально обработали. Есть ли такая концепция в Mule, где я могу сказать, где я официально остановился и привязался к БД?