kafka смещение и отставание неизвестны для некоторых разделов темы

введите здесь описание изображения

Я использую потребителя из https://github.com/confluentinc/confluent-kafka-go. Версия кафки - 0.10.1.0.

Вот конфигурация моего потребителя:

kafkaClient, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers":               broker,
    "group.id":                       "udwg20",
    "session.timeout.ms":              60000,
    "go.events.channel.enable":        true,
    "go.application.rebalance.enable": true,
    "default.topic.config": kafka.ConfigMap{
        "auto.offset.reset":      "earliest",
        "enable.auto.commit":      true,
        "auto.commit.interval.ms": 10000}})

Вначале отображалось все текущее смещение и задержка, но после нескольких часов работы смещение и задержка некоторых разделов (которые не получали никаких новых сообщений) становятся неизвестными. Если в раздел поступило сообщение, смещение и задержка которого неизвестны, смещение и задержка снова будут видны, и сообщение будет использовано.

Когда есть несколько разделов с неизвестным текущим смещением и задержкой, я перезапускаю потребителя, в это время все текущие разделы с неизвестным текущим смещением и задержкой будут использоваться снова с самого начала, но другие разделы, похоже, работают нормально.

Я также использовал потребителя python, который получает сообщение из этой темы с другим идентификатором группы потребителей. Потребитель python, похоже, хорошо работает без какого-либо раздела с неизвестным текущим смещением и задержкой.


person hongnguyenhuu96    schedule 02.10.2018    source источник


Ответы (2)


offsets.retention.minutes используется для очистки неактивных групп потребителей. Если группа потребителей не фиксирует какое-либо смещение для offsets.retention.minutes (по умолчанию 24 часа), kafka очистит свое смещение. Вот почему для смещения и журнала установлено значение unknown.

Вы можете увеличить срок хранения смещения, однако имейте в виду, что старые потребители будут резервировать место в __consumer_offsets теме.

person Giorgos Myrianthous    schedule 02.10.2018
comment
Я установил offsets.retention.minutes = 10080 (7 дней) в конфигурации и перезапустил kafka, но после работы примерно от 3 до 5 часов текущее смещение и задержка по-прежнему меняются на неизвестные. Я не думаю, что моя группа потребителей неактивна, поскольку текущее смещение и задержка были показаны для некоторых разделов, это доказывает, что моя группа потребителей активна. - person hongnguyenhuu96; 02.10.2018
comment
@ hongnguyenhuu96, а вы подтверждаете, что в теме смещения используется уплотнение, а брокер настроен на включение уплотнения? - person Giorgos Myrianthous; 02.10.2018
comment
да. В server.properties. log.cleanup.policy=compact offsets.retention.minutes=10080 Я также использовал потребителя python, который выполняет ту же функцию (я хочу преобразовать код этого потребителя python в golang для повышения производительности). Для потребителя Python нет раздела с неизвестным смещением и задержкой. Думаю, проблема в библиотеке. Теперь я пытаюсь использовать api опроса вместо использования события go chanel. - person hongnguyenhuu96; 02.10.2018
comment
Я пробовал использовать api consumer.poll, но через 2 часа это все еще происходит. Текущее смещение и отставание теперь неизвестны для некоторых разделов. - person hongnguyenhuu96; 02.10.2018
comment
@ hongnguyenhuu96 То же происходит и с вашим потребителем Python? - person Giorgos Myrianthous; 02.10.2018
comment
Нет, для потребителя python текущее смещение и задержка всех разделов ясны. - person hongnguyenhuu96; 02.10.2018
comment
перейти к потребителю: изображение потребителя Python : изображение - person hongnguyenhuu96; 02.10.2018

Я использую приведенную ниже команду, чтобы увидеть, фиксируется ли смещение идентификатора моей группы потребителей периодически или нет.

echo exclude.internal.topics=false > consumer.properties

kafka-console-consumer --consumer.config consumer.properties --from-beginning --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

Хотя я установил для enable.auto.commit значение true, он не выполняет периодическую фиксацию для разделов, для которых его lag = 0. Текущее смещение для этих разделов удаляется через 2-3 часа, даже если группа потребителей все еще активна.

Чтобы решить эту проблему, я установил enable.auto.commit в false и написал свою собственную функцию для фиксации смещения каждые 5 секунд.

Вот идеал: когда потребитель получает новое событие Message или достигает события конца раздела (PartitionEOF), из данных события я сохраняю последнее текущее смещение в карте фиксации (ключ: topic_partition значение: kafka.TopicPartition{ Topic, Partition, Offset }) и там - это функция для периодической фиксации этой карты (может быть каждые 5 секунд). Когда потребитель получает событие RevokedPartitions, я удаляю соответствующий ключ topic_partition из карты фиксации.

person hongnguyenhuu96    schedule 10.10.2018