Spring Kafka Consumer - Печать информации о задержке Kafka

Я создал Spring-потребителя kafka, который читает из темы. Есть ли способ распечатать информацию о задержке, подобную тому, как мы печатаем информацию о разделах?


person Punter Vicky    schedule 29.09.2017    source источник


Ответы (2)


Есть инструмент командной строки ...

$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group myGroup

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
myTopic                        0          66              66              0          -                                           

ИЗМЕНИТЬ

Вы можете запустить инструмент командной строки и записать вывод ...

Process process = new ProcessBuilder()
        .command("/usr/local/bin/kafka-consumer-groups", "--bootstrap-server", "localhost:9092",
                "--describe", "--group", "siTestGroup")
        .start();
InputStream inputStream = process.getInputStream();
process.waitFor(10, TimeUnit.SECONDS);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FileCopyUtils.copy(inputStream, baos);
System.out.println(new String(baos.toByteArray()));
person Gary Russell    schedule 29.09.2017
comment
Спасибо, Гэри. Есть ли способ сделать это программно с помощью Spring Kafka. Я хотел вывести задержку как часть сообщений журнала. - person Punter Vicky; 29.09.2017
comment
Не сейчас; в отличие от версии Scala, версия AdminClient для Java не имеет метода describeConsumerGroup(). Вам нужно будет добавить scala jar в путь к классам и написать класс Scala, чтобы делать большую часть того, что в настоящее время находится в ConsumerGroupCommand.KafkaConsumerGroupService. - person Gary Russell; 29.09.2017
comment
Спасибо за помощь, Гэри. Есть ли способ добавить этот запрос функции? - person Punter Vicky; 29.09.2017
comment
Вам нужно будет открыть запрос новой функции в Kafka JIRA, чтобы попросить их реализовать метод на java AdminClient. Если вы просто хотите записать информацию в журнал, я добавил в свой вопрос правку, чтобы можно было обойти ограничение. - person Gary Russell; 29.09.2017

Хотя исходный код не был предоставлен, я предполагаю, что вы реализовали своего потребителя с помощью аннотации @KafkaListener. Я преодолел ту же проблему, которую вы описали, используя интерфейс org.apache.kafka.clients.consumer.Consumer, как указано здесь. Его можно объявить как параметр в методе-потребителе в аннотации @KafkaListener. Этот интерфейс предоставляет метод metrics (), который содержит информацию о задержке потребителя, хранящуюся в свойстве records-max-lag.

private static final Logger LOGGER = LoggerFactory.getLogger(YourClass.class);

@KafkaListener(topics = "your-topic", groupId = "your-group-id", id = "your-client-id", containerFactory = "kafkaListenerContainerFactory")
public void listenerExample(List<String> msgs, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack,
        Consumer<?, ?> consumer) {



    String lag = consumer.metrics().values().stream().filter(m -> "records-lag-max".equals(m.metricName().name()))
            .map(Metric::metricValue).map(Object::toString).distinct()
            .collect(Collectors.joining("", "[Kafka current consumer lag]", " records"));


    LOGGER.info(lag);


}

В этом случае я явно выбрал свойство records-lag-max. Вы могли выбрать любую другую метрику потребителя, список находится на странице Confluent Docs < / а>.

Приведенный выше фрагмент кода будет иметь следующий результат: [Kafka current consumer lag] X records Где X - максимальное отставание по количеству записей для любого раздела в этом окне.

Важный:

Я использую версию 2.3.3.RELEASE библиотеки Spring Kafka

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.3.RELEASE</version>
</dependency>
person Felipe Dias    schedule 11.12.2019