Прочитайте тему Кафки с начала

У меня есть приложение весенней загрузки, которое продолжает читать все сообщения из определенной темы. Моя проблема в том, что у меня проблема в том, что при перезапуске приложения ему нужно заново прочитать все сообщения из темы. Я думал, что эти два варианта вместе сделают это, но это не работает:

  • resetOffsets: правда
  • startOffset: самый ранний

Вот мой метод;

 @StreamListener(myTopic)
 public void handle(@Payload Input input) {
    /** Do other stuff not related **/
 }

Вот мое приложение.yaml

spring:
  cloud.stream:
    bindings:
      myTopic:
        destination: TOPIC
        content-type: application/**avro
        group: group-topic
        concurrency: 1
        resetOffsets: true
        startOffset: earliest

person Victor    schedule 09.11.2020    source источник
comment
Вам нужно изменить группу потребителей (конфигурация group), чтобы иметь возможность читать с самого начала. В противном случае две опции resetOffset и startupOffset не будут иметь никакого значения.   -  person mike    schedule 10.11.2020
comment
Я не мог понять, что я должен изменить. Не могли бы вы немного подробнее? Извините, я новичок в Kafka.   -  person Victor    schedule 10.11.2020
comment
Привет, Виктор, нет проблем. Добавили более подробную информацию в ответ ниже.   -  person mike    schedule 10.11.2020


Ответы (1)


Вам нужно изменить group в вашем application.yaml на новое и уникальное имя группы (см. пример ниже, где я установил для группы потребителей значение new-consumer-group-id):

spring:
  cloud.stream:
    bindings:
      myTopic:
        destination: TOPIC
        content-type: application/**avro
        group: new-consumer-group-id
        concurrency: 1
        resetOffsets: true
        startOffset: earliest

Если вы продолжите использовать ту же ConsumerGroup, конфигурации resetOffset и startingOffset не окажут никакого влияния.

В качестве альтернативы вы можете сбросить смещения для каждой группы потребителей с помощью инструмента командной строки kafka-consumer-groups.sh. Шаблон показан ниже:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
 --execute --reset-offsets \
 --group groupA\
 --topic topicC \
 --partition 0 \
 --to-offset <insert number>
person mike    schedule 10.11.2020
comment
Итак, в этом случае каждый раз, когда мое приложение запускается, у него должна быть другая группа потребителей? Например, добавление временной метки в конце имени, определенного в yaml? - person Victor; 10.11.2020
comment
да, это было бы решением. В противном случае вы можете использовать инструмент командной строки, который поставляется с Kafka. Я обновил свой ответ соответственно. - person mike; 10.11.2020
comment
Привет @mike, да, это ответ. Большое спасибо. - person Victor; 16.11.2020