Kafka узнает, когда используются связанные сообщения

Есть ли способ в Kafka создать сообщение после использования нескольких связанных сообщений? (без необходимости вручную управлять им в коде приложения ...)

Вариант использования - выбрать огромный файл, разделить его на несколько частей, опубликовать сообщение для каждого из этих фрагментов в теме, и после того, как все эти сообщения будут использованы, создать другое сообщение, уведомляющее о результате по другой теме.

Мы можем сделать это с помощью базы данных или REDIS, чтобы контролировать состояние, но мне интересно, есть ли какой-либо подход более высокого уровня, использующий только экосистему Kafka.


person Luiz Henrique Martins Lins Rol    schedule 11.09.2020    source источник
comment
Как будет выглядеть ваш потребитель, получивший все эти сообщения, и когда все эти сообщения будут приняты? Это тоже приложение Kafka Streams или что-то еще?   -  person mike    schedule 11.09.2020
comment
Первоначально это был бы потребитель приложения Spring-Boot Kotlin, но мы были бы открыты для вариантов ...   -  person Luiz Henrique Martins Lins Rol    schedule 11.09.2020


Ответы (2)


Подход может быть следующим:

  1. После использования каждого фрагмента приложение должно выдать сообщение со статусом (потреблено и номер фрагмента).
  2. Второе приложение (Kafka Streams один раз) должно агрегировать результат, и, когда сообщения обработки со всеми фрагментами создают окончательное сообщение, этот файл обрабатывается.
person Bartosz Wardziński    schedule 14.09.2020
comment
для меня это имеет смысл и звучит многообещающе, но как бы мы однажды в потоках kafka узнали, что все фрагменты были обработаны (извините за мое незнание, никогда на самом деле потоки не использовались). У вас есть документация или фрагмент, указывающий на это? - person Luiz Henrique Martins Lins Rol; 15.09.2020
comment
например: сообщение со статусом блока может иметь следующий вид: `(ключ: fileUniqueName, значение: chunkNumber, numberOfChunks)`. В приложении потоков Kafka вы можете использовать ProcessorApi (kafka.apache .org / 10 / documentation / streams / developer-guide /) и агрегировать его произвольным образом - используя хранилище состояний, вы можете сохранять статус относительно количества обработанных блоков. - person Bartosz Wardziński; 15.09.2020

Вы можете использовать ConsumerGroupCommand, чтобы проверить, завершила ли определенная группа потребителей все сообщения в определенной теме. :

  1. $ kafka-consumer-groups --bootstrap-server broker_host:port --describe --group chunk_consumer

OR

  1. $ kafka-run-class kafka.admin.ConsumerGroupCommand ...

Нулевая задержка для каждого раздела будет указывать на то, что сообщения были успешно использованы, а смещения зафиксированы потребителем.

Кроме того, вы можете подписаться на тему __consumer_offsets и самостоятельно обрабатывать сообщения из нее, но использование ConsumerGroupCommand кажется более простым решением.

person mazaneicha    schedule 11.09.2020
comment
Насколько я понимаю, группы потребителей будут привязаны к конкретному приложению, а не созданы динамически для каждого файла. Я подозреваю, что другой ответ: использование потоков kafka имеет больше смысла для конкретного случая использования. - person Luiz Henrique Martins Lins Rol; 15.09.2020
comment
Не уверен, что я понимаю - фиксация смещения потребителем ЯВЛЯЕТСЯ подтверждением того, что сообщение было успешно использовано. Итак, если со стороны производителя вы отслеживаете смещения и гарантируете, что все смещения зафиксированы, вы знаете, что все ваши фрагменты потребляются. Как только это произойдет, вы можете опубликовать подтверждение или сделать все, что вам нужно. - person mazaneicha; 15.09.2020