Я создал Spring-потребителя kafka, который читает из темы. Есть ли способ распечатать информацию о задержке, подобную тому, как мы печатаем информацию о разделах?
Spring Kafka Consumer - Печать информации о задержке Kafka
Ответы (2)
Есть инструмент командной строки ...
$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group myGroup
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myTopic 0 66 66 0 -
ИЗМЕНИТЬ
Вы можете запустить инструмент командной строки и записать вывод ...
Process process = new ProcessBuilder()
.command("/usr/local/bin/kafka-consumer-groups", "--bootstrap-server", "localhost:9092",
"--describe", "--group", "siTestGroup")
.start();
InputStream inputStream = process.getInputStream();
process.waitFor(10, TimeUnit.SECONDS);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FileCopyUtils.copy(inputStream, baos);
System.out.println(new String(baos.toByteArray()));
AdminClient
для Java не имеет метода describeConsumerGroup()
. Вам нужно будет добавить scala jar в путь к классам и написать класс Scala, чтобы делать большую часть того, что в настоящее время находится в ConsumerGroupCommand.KafkaConsumerGroupService
.
- person Gary Russell; 29.09.2017
AdminClient
. Если вы просто хотите записать информацию в журнал, я добавил в свой вопрос правку, чтобы можно было обойти ограничение.
- person Gary Russell; 29.09.2017
Хотя исходный код не был предоставлен, я предполагаю, что вы реализовали своего потребителя с помощью аннотации @KafkaListener. Я преодолел ту же проблему, которую вы описали, используя интерфейс org.apache.kafka.clients.consumer.Consumer, как указано здесь. Его можно объявить как параметр в методе-потребителе в аннотации @KafkaListener. Этот интерфейс предоставляет метод metrics (), который содержит информацию о задержке потребителя, хранящуюся в свойстве records-max-lag.
private static final Logger LOGGER = LoggerFactory.getLogger(YourClass.class);
@KafkaListener(topics = "your-topic", groupId = "your-group-id", id = "your-client-id", containerFactory = "kafkaListenerContainerFactory")
public void listenerExample(List<String> msgs, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack,
Consumer<?, ?> consumer) {
String lag = consumer.metrics().values().stream().filter(m -> "records-lag-max".equals(m.metricName().name()))
.map(Metric::metricValue).map(Object::toString).distinct()
.collect(Collectors.joining("", "[Kafka current consumer lag]", " records"));
LOGGER.info(lag);
}
В этом случае я явно выбрал свойство records-lag-max. Вы могли выбрать любую другую метрику потребителя, список находится на странице Confluent Docs < / а>.
Приведенный выше фрагмент кода будет иметь следующий результат: [Kafka current consumer lag] X records
Где X - максимальное отставание по количеству записей для любого раздела в этом окне.
Важный:
Я использую версию 2.3.3.RELEASE библиотеки Spring Kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.3.RELEASE</version>
</dependency>