Производитель шаблонов Java Spring Kafka потерял сообщения при перезапуске брокера

Я использую spring -boot (2.1.6.RELEASE) с spring-kafka (2.2.7.RELEASE), и я отправляю сообщения в свой кластер kafka с помощью KafkaTemplate. Но иногда (обычно, когда я перезапускаю кафка-брокер или выполняю ребалансировку) я вижу такие ошибки, когда отправляю сообщения:

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

Из-за настроек производителя Kafka по умолчанию я ожидаю, что ошибки отправки будут повторяться, но это не так. Конфиги производителя Kafka по умолчанию:

retries: 2147483647  (https://kafka.apache.org/documentation/#retries)
acks: 1               (https://kafka.apache.org/documentation/#acks)

Моя конфигурация такова:

@Bean
    public Map<String, Object> producerConfigs()
    {
        // See https://kafka.apache.org/documentation/#producerconfigs for more properties
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        return props;
    }

    @Bean
    public ProducerFactory<Long, String> producerFactory()
    {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Long, String> kafkaTemplate(KafkaTemplateProducerListener<Long, String> kafkaTemplateProducerListener,
                                                     ProducerFactory<Long, String> producerFactory)
    {
        KafkaTemplate<Long, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        kafkaTemplate.setProducerListener(kafkaTemplateProducerListener);
        return kafkaTemplate;
    }

и я отправляю такие сообщения:

kafkaTemplate.send(topicName, key, body);

Я искал по всему Интернету, и все говорят, что эта конфигурация с повторными попытками и подтверждениями должна работать, но это не так. Что мне не хватает?

Спасибо


person Georgi Staykov    schedule 05.12.2020    source источник


Ответы (1)


Потратив некоторое время на отладку, я нашел решение:

props.put(ProducerConfig.ACKS_CONFIG, "all");

Для получения дополнительной информации об этом свойстве: https://kafka.apache.org/documentation/#acks


Очень хороший блог, в котором показаны различные сценарии потери сообщений в кафке:



Дополнительное примечание - из этот ответ. Я обнаружил, что это хорошая идея, если вы не хотите терять сообщения при завершении работы:

@PreDestroy
public void flush()
{
    kafkaTemplate.flush();
}
person Georgi Staykov    schedule 09.12.2020