Как реализовать простого потребителя Kafka, который обнаруживает и отбрасывает повторяющиеся сообщения.

Потребители сообщений очень часто сталкиваются с манипуляциями с государством. Брокеры сообщений могут отправлять повторяющиеся сообщения по разным причинам. Таким образом, потребители должны овладеть искусством «идемпотентной обработки сообщений», чтобы

преодолеть серьезные потоки в состоянии приложения.

В этом посте я обсуждаю простой механизм обнаружения и удаления повторяющихся сообщений при обработке сообщений.

Почему мы получаем одно и то же сообщение более одного раза?

Цикл выполнения типичного потребителя сообщения будет выглядеть так.

Потребитель повторно выполняет следующие действия.

  • Получите сообщение от брокера.
  • Обработайте сообщение, выполнив некоторую бизнес-логику.
  • Подтвердите брокеру сообщение о том, что оно было успешно обработано и не должно быть доставлено повторно.

Однако мы не живем в идеальном мире. Неудачи следует ожидать. В нашем случае подтверждение обработки может не дойти до брокера по нескольким причинам.

Из-за сбоя потребителя. Пользователь сообщения может аварийно завершить работу в середине обработки. Следовательно, не удалось отправить подтверждение.

Из-за сбоя брокера сообщений. К тому времени, когда подтверждение потребителя доходит до брокера, он мог дать сбой или перейти в автономный режим. Таким образом, он упускает из виду признание.

Из-за сбоя сети. Клиент и брокер доступны. Но подтверждение теряется при передаче из-за какой-то сетевой ошибки.

Обычно брокер сообщений обязуется доставить сообщение адресату хотя бы один раз. Таким образом, в любом из вышеперечисленных случаев брокер пытается повторно доставить неподтвержденные сообщения. Таким образом, потребитель может получить одно и то же сообщение более одного раза.

Почему важно использовать идемпотентные сообщения

Потребитель считается идемпотентным, если он дает одинаковый результат даже после нескольких вызовов.

Природа логики обработки заставляет потребителя быть идемпотентным или нет. Например, обработка без сохранения состояния, такая как вычисление значения на основе входных данных, естественно идемпотентна.

Но некоторые операции нужно явно сделать идемпотентными. Особенно операции, которые касаются государства. Повторная обработка сообщений может нанести серьезный ущерб состоянию потребителя. Например, потребитель, обновляющий баланс счета, должен отказаться от повторяющихся событий вывода средств. В противном случае счет будет списан некорректно.

Делаем потребителей идемпотентными

Фундаментальный принцип идемпотентного потребителя - отслеживать сообщения, которые он обработал.

Исходя из этого, давайте создадим простую платформу для достижения этой цели. Фреймворк требует следующего.

1. Уникальный идентификатор сообщения: производители должны добавить уникальный идентификатор к сообщениям при публикации в брокере. Это может быть что-то вроде UUID или случайная строка. Дело в том, чтобы однозначно идентифицировать сообщение. Насколько мне известно, некоторые спецификации обмена сообщениями, такие как JMS, поддерживают это естественным образом.

2. Место для хранения идентификаторов сообщений. Потребитель должен взять на себя ответственность за надежную запись идентификаторов сообщений, которые он успешно обработал. Самый простой способ - создать таблицу message_log в базе данных. Я объясню это в ближайшее время.

3. Логика обнаружения дубликатов: Наконец, потребитель должен проверить таблицу message_log перед обработкой любого полученного сообщения. Если таблица уже содержит идентификатор сообщения, обработки не должно происходить. В противном случае потребитель обрабатывает сообщение и записывает его идентификатор в таблицу message_log.

Ниже показана простая структура, описанная выше.

Реализация с Quarkus, MySQL и Kafka

Я сделал эталонную реализацию для этого на основе потребителя сообщений, написанного на Quarkus. Этот образец Дебезиума сильно повлиял на это.

Он считывает сообщения из темы Kafka и просто регистрирует содержимое сообщения, после чего записывает идентификатор сообщения в таблицу Consmed_messages в базе данных MySQL. Вы можете найти исходный код готового проекта здесь.

Таблица consmed_messages

Определение таблицы простое и имеет следующую структуру. Столбцы говорят сами за себя.

CREATE TABLE `consumed_messages` (
`eventId` varchar(255) NOT NULL,
`timeOfReceiving` datetime(6) DEFAULT NULL,
PRIMARY KEY (`eventId`)
) ENGINE=InnoDB;

ORM-классы на основе Hibernate

Я создал классы ConsumedMessage и MessageLog для сопоставления таблицы с Hibernate, чтобы я мог избежать написания вручную свернутых операторов SQL для решения этой проблемы.

Кроме того, Quarkus предоставляет аннотации JPA для обозначения методов, которые должны быть транзакционными. В приведенном ниже классе MessageLog оба метода выполняются в рамках транзакции. Если что-то пойдет не так, например, нарушение уникального ограничения на уровне таблицы, мы можем выполнить чистый откат.

Обработчик событий

Приведенный ниже класс OrderEventHandler получает объект OrderEvent и проверяет, был ли уже обработан идентификатор события. В случае обработки событие возвращается. В противном случае вызывается соответствующая бизнес-логика, и событие будет помечено как обработанное.

Обратите внимание, что метод onOrderEvent () также сделан транзакционным для надежности и согласованности. Если бизнес-логика выдает ошибку или сохранение идентификатора сообщения в Consmed_messages по какой-то причине не удается, мы выполним полный откат.

Потребитель событий Kafka

KafkaEventConsumer - это класс уровня инфраструктуры. Он считывает сообщения из темы заказы, ​​десериализует закодированный в JSON контент как OrderEvents и передает их OrderEventHandler. Вы можете найти исходный код здесь.

Событие заказа

Типичное событие заказа, исходящее от Kafka, будет выглядеть следующим образом. У него уникальный eventId.

{
"eventId":"e7960360-bd18-408f-a0f4-c4047ee17403",
"eventType":"ORDER_CREATED",
"payload":{
    "id":12345,
    "customerId":"12df-dfdf-223",
    "status":"CREATED"
  }
}

Альтернативные подходы

В своем блоге Крис Ричардсон объясняет два альтернативных подхода к реализации.

Один из подходов использует таблицу потребляемые_сообщения, которая вводится с помощью идентификатора сообщения. Потребитель пытается вставить идентификатор полученного сообщения в эту таблицу с последующим обновлением таблицы приложения. Если идентификатор уже существует, транзакция базы данных завершится ошибкой, позволяя потребителю откатить все и подтвердить.

Второй подход больше ориентирован на базы данных NoSQL, где транзакции ACID ограничены. Там идентификатор сообщения хранится как атрибут в той же обновляемой таблице.

Выводы

Посредник сообщений может повторно доставлять одно и то же сообщение. Обработка дублирующихся сообщений может вызвать серьезные ошибки у потребителей с отслеживанием состояния.

Чтобы сделать потребление сообщений идемпотентным, потребители должны записывать успешно обработанные сообщения и отбрасывать дубликаты.

Крис Ричардсон назвал это моделью идемпотентного потребителя.