Плохая новость: если это уже произошло, вы мало что можете сделать. Возможно, производитель сможет повторно обработать данные, но это сильно зависит от вашего приложения. Так что ... извините, я ничем не могу помочь. Вместо этого я постараюсь описать наиболее важную конфигурацию для предотвращения потери данных в Kafka.

Обновление: следующий блог был написан для Kafka ‹2.8.0. KIP-679 изменяет конфигурацию производителя по умолчанию для подтверждения с 1 на все и для enable.idempotence с false на true и планируется для Kafka 3.0.0.

Выражение признательности производителю

Это очень важная конфигурация на уровне производителя. Согласно документации свойство acks:

Количество подтверждений, которые производитель требует, чтобы лидер получил, прежде чем считать запрос завершенным.

Возможные значения: 0, 1, all и -1 (что эквивалентно all).

Что это значит?

Если установить это значение на «0» - значит «выстрелил и забыл». Производитель продолжает свою работу, несмотря на то, что брокеры смогли сохранить сообщение. Определенно не лучшая идея, когда каждое сообщение важно.

Установка «1» означает, что производитель продолжит работу, когда лидер смог сохранить сообщение. Звучит намного лучше, но представьте себе сценарий, когда лидер выходит из строя, а сообщения еще не были реплицированы на другие реплики - эти сообщения будут потеряны.

Значение свойства «все» предполагает, что оно гарантирует, что сообщение сохранялось на всех репликах, но все обстоит сложнее - на самом деле это означает, что оно было написано на всех синхронизированных репликах. Это сбивает с толку и может быть ловушкой. Подробнее об этом позже. В любом случае - «все» (или «-1») - наиболее безопасная настройка.

Производитель повторяет попытку

С помощью параметра retries можно точно настроить, сколько раз производитель должен пытаться отправить сообщение брокеру, если отправка не удалась. Значение по умолчанию - 2147483647, что составляет максимальное число int.

Звучит просто? Ну, как и во многих других сеттингах Кафки, здесь тоже ловушка. На самом деле две ловушки.

Во-первых, есть еще один параметр delivery.timeout.ms, который устанавливает тайм-аут для всех повторных попыток. Таким образом, если retries большое число, но тайм-аут невелик, доставка сообщения в любом случае завершится ошибкой.

Во-вторых, если порядок сообщений важен, очень важно установить для них max.in.flight.requests.per.connection значение 1. Установка значения больше 1 может вызвать переупорядочение сообщений. Подробнее об этом читайте в его отличной статье, в которой подробно объясняется проблема.

Репликация

Кафка предлагает репликацию. Это может быть «палочкой-выручалочкой» в случае банкротства брокера. Следует помнить несколько важных вещей:

  • репликация выполняется на уровне раздела,
  • для каждого раздела есть один лидер и один или несколько последователей,
  • подписчики получают данные от лидера,
  • если производитель acks установлен в all - производитель получает подтверждение, когда сообщение сохраняется на всех синхронизированных репликах.

Из этих фактов можно сделать важные выводы:

  • если у вас установлен высокий уровень репликации с брокерами на нескольких сайтах (например, в нескольких зонах доступности), получение сообщений всеми подписчиками занимает некоторое время.
  • если лидер выходит из строя после того, как он получил сообщение, но до того, как это сообщение было получено последователями, сообщение теряется. Продюсер несет ответственность за воспроизведение этого сообщения. Вот почему настройка acks так важна.

Коэффициент репликации по умолчанию для всех тем можно настроить с помощью default.replication.factor - будьте осторожны, значение по умолчанию - 1!

Коэффициент репликации также можно установить для каждой темы при ее создании. Если кто-то хочет иметь разные настройки для определенных тем, рекомендуется установить для auto.create.topics.enable значение false и создавать темы с помощью скрипта.

Репликация - особые случаи

Есть 2 дополнительных параметра репликации: offsets.topic.replication.factor
и
transaction.state.log.replication.factor

Это настройки брокера для «особых» тем - в первой хранятся потребительские взаимозачеты, а во второй - детали транзакций. Имейте в виду, что для этих тем не используются настройки по умолчанию для обычных тем.

Минимальное количество синхронизируемых реплик

Как уже было сказано, свойство производителя acks определяет, когда кластер Kafka должен подтверждать сообщение, а параметр all более безопасен. Что в данном случае означает слово «все»? Значит all in-sync replicas.

min.insync.replicas означает:

Когда производитель устанавливает для подтверждения значение «все» (или «-1»), min.insync.replicas указывает минимальное количество реплик, которые должны подтвердить запись, чтобы запись считалась успешной. Если этот минимум не может быть соблюден, производитель вызовет исключение (NotEnoughReplicas или NotEnoughReplicasAfterAppend).

Рассмотрим сценарий: мы устанавливаем acks=all и min.insync.replicas=1 (что по умолчанию!). Сеть нестабильна, синхронизируется только лидер (например, другие брокеры теряют соединение с zookeeper). Производитель пишет сообщение, и - согласно min.insync.replicas это сообщение подтверждается. И до того, как другие брокеры вернутся к работе, этот узел выходит из строя из-за сбоя. Что это значит? Это означает, что это сообщение никогда не будет передано другим брокерам и потеряно.

Этот сценарий кажется нереальным, но это реальный живой пример.

Минимальный уровень безопасности для min.insync.replicas - 2. Это может быть опасно, поскольку значение по умолчанию - 1, и его легко забыть изменить.

min.insync.replicas, настроенный на брокере, будет использоваться по умолчанию для всех новых тем (вы можете настроить его для каждой темы).

Опять же, тема транзакции не использует этот параметр, у него есть собственный: transaction.state.log.min.isr.

Выборы нечистого лидера

TL; DR: значение по умолчанию false и никогда не устанавливайте true, если долговечность важнее доступности. Это особенно опасно, если для min.insync.replicas установлено значение 1. Почему? Рассмотрим сценарий:

  1. У вас 3 брокера, broker_1 - лидер.
  2. broker_3 по какой-то причине отключается.
  3. broker_1 удаляет его из списка ISR.
  4. Продюсер продолжает свою работу и пишет несколько сообщений.
  5. broker_1 и broker_2 одновременно отключаются.
  6. broker_3 выздоравливает и снова в сети, становится лидером.
  7. broker_2 восстанавливается и начинает следовать за broker_3.

Что это значит? Сообщения, сохраненные (и подтвержденные!), Когда broker_3 был отключен, теряются.

Установка unclean.leader.election.enable на false предотвращает превращение брокера в лидера, если его нет в списке ISR.

Потребительская автоматическая фиксация

Когда потребитель получает сообщения, он должен «сказать» Hello, I already got this message, please don't give it to me again if I ask for new messages!!. Это делается путем фиксации смещения. Это можно сделать вручную или автоматически. Если enable.auto.commit установлен на true, смещение потребителя будет периодически фиксироваться в фоновом режиме. Таким образом, вы не можете контролировать, когда отправляется смещение, его можно отправить даже до того, как сообщения были обработаны. Легко представить, что произойдет, если потребитель потерпит неудачу - он не получит сообщения, уже «старше», чем зафиксированное смещение, и такие сообщения будут потеряны.

(На самом деле все немного сложнее - смещение сохраняется не для конкретного экземпляра потребителя, а для группы потребителей, но давайте сейчас его немного упростим).

Что произойдет, если вы вручную зафиксируете смещение, а потребитель откажет после обработки сообщения, но до того, как смещение будет зафиксировано? Да, сообщение обрабатывается дважды, в результате чего доставляется не более одного раза. Если важна уникальность, приходится дедуплицировать сообщения во время обработки, но это тема для другой истории.

Сообщения не синхронизируются с диском

Что произойдет, когда все брокеры подтвердят сообщение? Означает ли это, что он уже сохранен на диске? Вообще-то, нет. Значит, это в памяти брокера. Это проблема? Да, это может быть проблемой, когда все брокеры терпят крах одновременно. Это возможно, когда все брокеры находятся в одной зоне доступности, что является худшей практикой.

Обычно сообщения сбрасываются на диск, когда операционная система решает это сделать, но это можно изменить, установив log.flush.interval.messages или log.flush.interval.ms на уровне брокера или flush.messages.flush.ms на уровне темы. Например, установка flush.messages=1 приведет к записи каждого отдельного сообщения на диск. Как вы понимаете, это сильно влияет на производительность, поэтому дважды подумайте, прежде чем это делать.

Дело обстоит еще хуже - если вы синхронизируете данные с диском, они могут храниться в дисковом кеше. Но сценарий, когда все диски у всех брокеров одновременно выходят из строя, практически невозможен. С другой стороны, если кластер плохо спроектирован и все брокеры находятся в одной зоне доступности, сбой питания может вызвать потерю данных.

Резюме

Kafka - это распределенная система, которая при правильной настройке может быть очень надежной. Некоторые выводы:

Настройки производителя

  • acks=all
  • retries=2147483647
  • delivery.timeout.ms=2147483647

Настройки брокеров

  • unclean.leader.election.enable=false
  • default.replication.factor=3
  • min.insync.replicas=2
  • offsets.topic.replication.factor=3
  • transaction.state.log.replication.factor=3
  • transaction.state.log.min.isr=2

Потребительские настройки

  • enable.auto.commit=false

Получите« Начните с электронной книги Apache Kafka »

Мы собрали уроки, полученные при консультировании клиентов и использовании Kafka в коммерческих проектах.

Нужна помощь с Apache Kafka?

Мы являемся сертифицированным техническим партнером Confluent. Наш инженерный опыт в области потоковой обработки и приложений распределенных систем подтвержден в коммерческих проектах, семинарах и консалтинге.

Свяжитесь с нами!