Что может вызвать огромную нагрузку в теме Kafka `__consumer_offsets`?

У меня есть простое наблюдение в моем кластере Kafka (Kafka 0.11.0.0).

Согласно информации jmx, тема __consumer_offsets постоянно загружается сообщениями в 10 раз больше, чем сумма всех сообщений во всех других темах. Я также подключил к этой теме потребителя консоли, и я могу измерить аналогичные значения.

  • Что может быть причиной?
  • Как я могу проверить, что делает брокер Kafka, и самостоятельно сгенерировать такую ​​нагрузку?

person Seweryn Habdank-Wojewódzki    schedule 10.11.2017    source источник
comment
если вы читаете тему __cosumer_offsets, вы можете узнать, какой groupId загружается. Это должно помочь в исследованиях   -  person Natalia    schedule 10.11.2017
comment
Спасибо за подсказку. Но к сожалению сообщения в этой теме бинарные и я ничего не вижу :-(.   -  person Seweryn Habdank-Wojewódzki    schedule 10.11.2017


Ответы (2)


прочитать тему __consumer_offsets:

bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server brokers --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --new-consumer --consumer.config consumer.conf

для kafka 11 используйте форматтер "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

где у consumer.conf одна строка

exclude.internal.topics=false

person Natalia    schedule 10.11.2017
comment
--formatter kafka.coordinator.GroupMetadataManager \ $ OffsetsMessageFormatter --new-consumer Exception в главном потоке java.lang.ClassNotFoundException: kafka.coordinator.GroupMetadataManager $ OffsetsMessageFormatter at java.net.URLClassLoader.jindavalassLoader: .lang.ClassLoader.loadClass (ClassLoader.java:424) в ... в kafka.tools.ConsoleConsumer $ ConsumerConfig. ‹init› (ConsoleConsumer.scala: 318) в kafka.tools.ConsoleConsumer $ .main (ConsoleConsumer.scala: 51) в kafka.tools.ConsoleConsumer.main (ConsoleConsumer.scala) - person Seweryn Habdank-Wojewódzki; 10.11.2017
comment
kafka_2.12-0.11.0.1 - person Seweryn Habdank-Wojewódzki; 10.11.2017
comment
Ага. работает для 9 и 10. Поищу решение в 11 - person Natalia; 10.11.2017
comment
попробуйте kafka.coordinator.group.GroupMetadataManager \ $ OffsetsMessageFormatter - person Natalia; 10.11.2017
comment
Еще один комментарий. Мы нашли причину такой огромной нагрузки. Flink по умолчанию делает синхронизацию кластера / клиента каждые 10 мс! Возможно, это хороший вариант по умолчанию для flink, но он может снизить производительность брокера. - person Seweryn Habdank-Wojewódzki; 22.12.2017

Для версии 2.2 kafka это демонстрация:

kafka_2.12-2.2.1 / bin #

./kafka-console-consumer.sh --bootstrap-server  your-broker   --topic __consumer_offsets --formatter  "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

Результат будет:

[MyGroup-7cc5f948df-f9tqz,__MY_TOPIC,2]::OffsetAndMetadata(offset=30, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1568121107017, expireTimestamp=None)
person ridox    schedule 10.09.2019
comment
Я получаю Exception in thread "main" java.lang.ClassNotFoundException: kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter. Вы знаете, как это исправить? Я использую kafka_2.12-2.3.0. - person jumping_monkey; 15.10.2019