Mulesoft с API потоковой передачи Salesforce с использованием CDC

Я работаю над потоком API Mule, тестируя потоки событий Salesforce. У меня настроен коннектор, и я подписан на канал потоковой передачи.

Это отлично работает, когда я создаю / обновляю / удаляю записи контактов, события проходят, и я обрабатываю их, добавляя их в другую базу данных.

введите описание изображения здесь

Меня немного смущает функция replayId. С текущими настройками я могу закрыть приложение Mule, создать контакты в организации, а затем, когда я верну приложение в онлайн, оно возобновится, добавив данные с того места, где оно было остановлено. Идеально.

Однако я пытаюсь смоделировать, что произойдет, если приложение mule выйдет из строя во время обработки событий.

Я запустил APEX, чтобы создать 100 случайных записей контактов. Как только я вижу, что он регистрирует первый поток в моем приложении, я убиваю приложение mule. Мое предположение заключалось в том, что он будет знать, где остановился, когда я возобновлю приложение, как если бы оно было отключено до создания контакта, как в предыдущем тесте.

Я заметил, что он обрабатывает только несколько контактов, которые прошли, прежде чем я закрою приложение.

Похоже, что события могут поступать так быстро во входном потоке, что они уже достигли последнего replayId в потоке. Однако, поскольку эти записи все еще не были добавлены в мою внешнюю базу данных, я теряю эти записи. Поток сделал то, что должен был сделать, но из-за того, что приложение все еще обрабатывает пакет, мои 100 записей не фиксируются, как это отражает replayId.

Как я могу подойти к этому, чтобы не потерять данные в случае большого потока данных до сбоя приложения? Я помню, что с Kafka вы должны были commit идентификатор после того, как он был вставлен в базу данных, чтобы он знал, что последний из них вы официально обработали. Есть ли такая концепция в Mule, где я могу сказать, где я официально остановился и привязался к БД?


person SBB    schedule 10.09.2020    source источник


Ответы (1)


Надежность на уровне протокола (CometD) предполагает ряд свойств. Главный из них - транзакционный ACK (подтверждение) сообщения, полученного подписчиком. CometD поддерживает ACK как расширение. Реализация CometD в Salesforce не поддерживает ACK. Даже если бы это было так, у вас все равно остались бы проблемы ... но частота / потери риска могут быть ниже.

В вашем случае вы должны разработать решение, которое сводится к поиску и воспроизведению событий, которые не были зафиксированы в вашей целевой базе данных. Вы делаете это с помощью специального кода или проводных адаптеров в Mule. Не гарантируется, что значения идентификатора воспроизведения будут непрерывными для последовательных событий, но они будут упорядочены. За событием A с идентификатором воспроизведения 100 последует событие B с идентификатором воспроизведения 200.

Вам нужно будет сохранить значение идентификатора воспроизведения в вашей БД. Затем вы можете использовать его при повторной подписке (после сбоя подписчика) для получения событий из SF, которые отсутствуют в вашей БД. Это будет работать, только если окно отказа достаточно мало. Окно хранения событий Salesforce в настоящее время составляет 24 часа для стандартной лицензии на события платформы. Лицензии более высокого уровня позволяют продлить срок хранения.

В зависимости от объема данных, частоты событий и других параметров процесса вы можете получить все это из коробки с помощью Heroku Connect. Это действительно подразумевает стоимость лицензии на HC и эксплуатационные расходы Postgres DB на Heroku +, но большинство наших клиентов в аналогичных обстоятельствах считают это целесообразным.

person identigral    schedule 10.09.2020
comment
Спасибо за подробный ответ. Можно ли использовать хранилище объектов mule для сохранения последнего известного вставленного идентификатора replayId, который обновляется после того, как я вставляю каждую запись в БД, подтверждая, что это была последняя запись, которую я видел и обрабатывал? Интересно, может ли это быть затем передано как значение для ReplayId, чтобы при следующем запуске приложения оно прочитало это значение из хранилища объектов и продолжило работу оттуда. - person SBB; 10.09.2020
comment
Да, вы могли бы это сделать, если он постоянный (по сравнению с памятью). Это не на 100% безопасно - запись в хранилище объектов может завершиться ошибкой. Это одна из причин, по которой лучше хранить replayID в цели. Причин больше, ACK - это всего лишь одна проблема. Вы также должны иметь дело с возможными дубликатами в воспроизводимых событиях по сравнению с вашей целью. - person identigral; 10.09.2020
comment
Полагаю, я не понимаю привлекательность использования этих событий через Mule, где, похоже, есть много стимулов сделать это для обработки CDC. Во всех примерах, которые я видел, ни один из них на самом деле не упоминает о необходимости сохранения этого идентификатора реле, они просто говорят, что он возобновится с того места, где он остановился в последний раз. Это правда, но CDC привлекает внимание к тому, чтобы взять эти записи и обработать их для синхронизации с другой таблицей, где, похоже, многое может пойти не так из-за отсутствия возможности вручную зафиксировать relayId, когда процесс действительно произошел ( например, вставка записи в другое место) - person SBB; 11.09.2020
comment
Примеры просты нарочно. Реальность немного сложнее .. - person identigral; 11.09.2020