Я использую spring -boot (2.1.6.RELEASE) с spring-kafka (2.2.7.RELEASE), и я отправляю сообщения в свой кластер kafka с помощью KafkaTemplate. Но иногда (обычно, когда я перезапускаю кафка-брокер или выполняю ребалансировку) я вижу такие ошибки, когда отправляю сообщения:
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
Из-за настроек производителя Kafka по умолчанию я ожидаю, что ошибки отправки будут повторяться, но это не так. Конфиги производителя Kafka по умолчанию:
retries: 2147483647 (https://kafka.apache.org/documentation/#retries)
acks: 1 (https://kafka.apache.org/documentation/#acks)
Моя конфигурация такова:
@Bean
public Map<String, Object> producerConfigs()
{
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
return props;
}
@Bean
public ProducerFactory<Long, String> producerFactory()
{
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Long, String> kafkaTemplate(KafkaTemplateProducerListener<Long, String> kafkaTemplateProducerListener,
ProducerFactory<Long, String> producerFactory)
{
KafkaTemplate<Long, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setProducerListener(kafkaTemplateProducerListener);
return kafkaTemplate;
}
и я отправляю такие сообщения:
kafkaTemplate.send(topicName, key, body);
Я искал по всему Интернету, и все говорят, что эта конфигурация с повторными попытками и подтверждениями должна работать, но это не так. Что мне не хватает?
Спасибо