Попытка настроить потребителя и производителя в Кафке

Я пытаюсь создать простой поток между производителем и потребителем через Kafka, используя node-rdkafka

Я использую режим debug: 'all', вот что я получаю из логов:

Производитель: test [0]: MessageSet with 1 message(s) delivered

Потребитель: Fetch topic test [0] at offset 38 (v2)

Тот факт, что потребитель меняет смещение при создании сообщения, заставляет меня поверить, что соединение с брокером настроено и аутентифицировано правильно.

Однако по какой-то причине я не получаю само сообщение от потребителя.

Это событие никогда не вызывается:

consumer.on('data', function(m) {
    console.log("consumed", m)
});

Я создал демонстрационный проект для тестирования, вам нужен брокер Kafka, который поддерживает протокол SASL_SSL, чтобы его использовать:

https://github.com/guysegal/kafka-example

В частности, это потребительский код:

https://github.com/guysegal/kafka-example/blob/master/src/consumer.ts

и код производителя:

https://github.com/guysegal/kafka-example/blob/master/src/producer.ts


person Guy Segal    schedule 20.11.2018    source источник


Ответы (1)


Вы можете установить значение 'auto.offset.reset': 'earliest' и снова запустить потребителя.

auto.offset.reset 

Это свойство определяет начальную позицию потребителя в случае, если потребитель начинает с группы, у которой нет последнего известного состояния. Например, в случае нового идентификатора группы.

Причина изменения значения auto.offset.reset в том, что если вы сначала производите и запускаете потребитель после этого, то смещение темы уже увеличилось, и потребитель будет начинать с последнего смещения (увеличенного смещения) и считывает сообщение с этого момента.

В то время как в случае 'earliest' потребитель начинает с первого доступного сообщения по теме.

person bittu    schedule 20.11.2018