В Mercafacil большинство наших микросервисов полагаются на Kafka для доставки асинхронных событий между микросервисами. Недавно мы столкнулись с ошибкой потребителя Kafka «Координатор не знает об этом участнике», что привело к повторной обработке сообщений в цикле. Такое поведение не только истощает ресурсы, но и может привести к пагубным несоответствиям данных, поскольку у нас есть такие события, как отправка сообщений клиентам или зачисление кэшбэка на кошельки клиентов. В этой статье мы подробно расскажем о проблеме, ее причинах и нашем подходе к ее решению.

TLDR

Для организаций, которые полагаются на сообщения Kafka один раз, обязательно:

  • Постоянно отслеживайте системные журналы на наличие ошибки «Координатор не знает об этом участнике» и создавайте для нее автоматические оповещения.
  • Убедитесь, что session.timeout.ms достаточно велик, чтобы соответствовать самому медленному времени обработки сообщения, и что heartbeat.interval.ms соответствующим образом скорректировано (до 1/3 времени ожидания сеанса).

Распределение проблем

Вышеупомянутая ошибка в основном была вызвана задержкой обработки сообщений в одном из наших микросервисов NestJS, который в значительной степени полагался на внешний API, который время от времени подвергался высокой задержке. Этот сбой привел к тому, что Kafka постоянно повторно потреблял пакет из примерно 20 сообщений в бесконечном цикле, в котором потребитель Kafka постоянно переподключался из-за ошибки.

Техническое решение

Чтобы решить проблему повторяющейся обработки сообщений и вытекающей из этого ошибки, мы использовали двусторонний подход:

  1. Обеспечение уникальности сообщений. Используя дедупликацию на основе UUID, мы гарантировали, что каждое сообщение будет обработано только один раз, независимо от того, было ли оно отправлено Kafka несколько раз.
  2. Уточнение конфигурации Kafka: чтобы устранить корень ошибки, мы точно настроили определенные конфигурации потребителей Kafka, тем самым оптимизировав механизм подтверждения сообщений и предотвратив ошибку в первую очередь.

Для более глубокого понимания этих параметров конфигурации Kafka можно обратиться к официальной документации Kafka.

Дедупликация на основе UUID

Механизм дедупликации был реализован с использованием базы данных «ключ-значение» с простой логикой, такой как:

const msgUUID = kafkaMessage.uuid;
if (!keyValDB.exists(msgUUID)) {
  processMessage(kafkaMessage);
  keyValDB.store(msgUUID);
}
/*
 The message is also acked if it was already processed, preventing
 the reprocess loop
*/

Уточнение конфигурации Kafka

Чтобы предотвратить исходную ошибку, нам нужно было убедиться, что конфигурация session.timeout.ms больше, чем любое возможное сообщение, которое обрабатывает микросервис. Кроме того, следуя рекомендуемым конфигурациям, heartbeat.interval.ms необходимо настроить так, чтобы оно составляло не менее 1/3 нового значения времени ожидания сеанса. Эти параметры описаны в официальной документации как:

heartbeat.interval.ms: ожидаемое время между пульсациями для координатора потребителей при использовании средств управления группой Kafka. Сердцебиения используются для обеспечения того, чтобы сеанс потребителя оставался активным, и для облегчения перебалансировки, когда новые потребители присоединяются к группе или покидают ее. Значение должно быть меньше, чем session.timeout.ms, но обычно не должно превышать 1/3 этого значения. Его можно отрегулировать еще ниже, чтобы контролировать ожидаемое время для обычных перебалансировок.

session.timeout.ms: тайм-аут, используемый для обнаружения сбоев клиента при использовании средства управления группой Kafka. Клиент периодически отправляет тактовые импульсы, чтобы сообщить брокеру о своей активности. Если до истечения этого тайм-аута сеанса брокер не получит тактов, то брокер удалит этого клиента из группы и инициирует повторную балансировку.

Потребитель Kafka был создан с чем-то похожим на:

import { KafkaClient } from 'nestjs-kafka';
const kafkaConfig = new KafkaClient({
  clientId: 'client-id',
  brokers: ['broker:9092'],
  consumer: {
    groupId: 'group-id',
    sessionTimeout: 90000, // large enough to fit any message being processed
    heartbeatInterval: 30000 // 1/3 of the session timeout
  }
});

Альтернативные решения

Другая стратегия заключается в ручном вызове пульса во время обработки сообщения. Хотя это гарантирует, что координатор Kafka постоянно обновляется о статусе потребителя, это может быть сложно реализовать, поскольку ваш код может блокироваться непредсказуемым образом.

Предупреждение об алгоритме предварительной выборки Kafka Consumer

Хотя приведенные выше решения устраняют конкретную ошибку, важно внимательно следить за алгоритмом предварительной выборки Kafka Consumer. Этот алгоритм в определенных конфигурациях может одновременно получать несколько сообщений. При неправильном обращении это может привести к непреднамеренной параллельной обработке и последующему дублированию сообщений.

Некоторые упоминания об этой проблеме можно найти в kafkajs/issues/1325 и Ветке списка рассылки Apache.