Kafka MirrorMaker 2.0 дублирует каждое сообщение

Я пытаюсь реплицировать кластер Kafka с помощью MirrorMaker 2.0. Я использую следующие свойства mm2.properties:

name = mirror-site1-site2
topics = .*
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1
plugin.path=/usr/share/java/kafka/plugin
clusters = site1, site2

# for demo, source and target clusters are the same
source.cluster.alias = site1
target.cluster.alias = site2

site1.sasl.mechanism=SCRAM-SHA-256
site1.security.protocol=SASL_PLAINTEXT
site1.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="<someuser>" \
   password="<somepass>";

site2.sasl.mechanism=SCRAM-SHA-256
site2.security.protocol=SASL_PLAINTEXT
site2.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="<someuser>" \
   password="<somepass>";

site1.bootstrap.servers = <IP1>:9093, <IP2>:9093, <IP3>:9093, <IP4>:9093
site2.bootstrap.servers = <IP5>:9093, <IP6>:9093, <IP7>:9093, <IP8>:9093

site1->site2.enabled = true
site1->site2.topics = topic1


# use ByteArrayConverter to ensure that records are not re-encoded
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

Итак, проблема в том, что mm2 всегда копирует сообщения x3:

# Manual message production: 

 kafkacat -P -b <IP1>:9093,<IP2>:9093,<IP3>:9093,<IP4>:9093 -t "topic1"


# Result in the source topic (site1 cluster): 

% Reached end of topic topic1 [2] at offset 405
Message1
% Reached end of topic topic1 [2] at offset 406
Message2
% Reached end of topic topic1 [6] at offset 408
Message3
% Reached end of topic topic1 [2] at offset 407

 kafkacat -P -b <IP5>:9093,<IP6>:9093,<IP7>:9093,<IP8>:9093 -t "site1.topic1"

# Result in the target topic (site2 cluster): 

% Reached end of topic site1.titi [2] at offset 1216
Message1
Message1
Message1
% Reached end of topic site1.titi [2] at offset 1219
Message2
Message2
Message2
% Reached end of topic site1.titi [6] at offset 1229
Message3
Message3
Message3

Я пробовал использовать Kafka из конфлюентного пакета и kafka_2.13-2.4.0 непосредственно из Apache, оба с Debian 10.1.

Сначала я поддержал такое поведение с помощью confluent 5.4, подумав, что это может быть ошибка в их пакете, поскольку у них есть репликатор и на самом деле они не должны заботиться о мм2, но я воспроизвел точно такую ​​же проблему с kafka_2.13-2.4.0 непосредственно из Apache без каких-либо изменение.

Я знаю, что mm2 еще не идемпотент и не могу гарантировать однократную доставку. В моих тестах (я пробовал много чего, включая настройку производителя или более крупный пакет из тысяч сообщений). Во всех этих тестах мм2 всегда X3 дублирует все сообщения.

Я что-то пропустил, кто-то поощрял то же самое? В качестве примечания к сайту с устаревшим mm1 с теми же пакетами у меня нет этой проблемы.

Ценю любую помощь ... Спасибо!


Даже если журнал изменений не дал мне уверенности в улучшении, я снова попытался запустить mm2, на этот раз с kafka 2.4.1. => Никаких изменений в этих странных дублированиях.

Я установил его на новый сервер, чтобы убедиться, что странное поведение, с которым я столкнулся, не связано с сервером.

Нужны ли мне особые права при использовании ACL? Я поставил "все", думая, что это не может быть более допустимым ... Даже если мм2 не является идемпотентным, да, я попробую правильно, связанное с этим.

Больше всего меня удивляет то, что я не могу найти ничего, что сообщало бы о такой проблеме, наверняка я должен сделать что-то не так, но в чем это вопрос ...


person Aurelien    schedule 31.01.2020    source источник


Ответы (2)


Вам необходимо удалить connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector из вашей конфигурации, потому что это говорит Mirror Maker использовать этот класс для коннекторов Heartbeats и Checkpoints, которые он генерирует вместе с коннектором Source, который реплицирует данные, и этот класс заставляет их вести себя точно так же, как коннектор Source, так что почему вы получаете 3 репликации сообщения каждый раз, вы фактически сгенерировали 3 исходных коннектора.

person FSAN    schedule 09.06.2020
comment
Это не дает ответа на вопрос. Вы можете искать похожие вопросы или обратиться к связанным и связанным вопросам в правой части страницы, чтобы найти ответ. Если у вас есть связанный, но другой вопрос, задайте новый вопрос и включите ссылку на этот, чтобы помочь понять контекст. См .: Задавайте вопросы, получайте ответы, не отвлекаясь - person dippas; 09.06.2020
comment
Я знаю, я бы прокомментировал, а не ответил, но я не могу комментировать, потому что у меня еще недостаточно репутации, и если я создам новый вопрос, он, вероятно, будет отмечен как дубликат, и я думаю, что было бы лучше чтобы сделать существующий вопрос более заметным. Я уже пытался найти похожие вопросы, но не смог. Кроме того, я прочитал руководство по тому, как дать хорошие ответы, и в нем есть кое-что о добавлении информации, даже если вы не можете ответить на вопрос, и я думаю, что то, что я сказал, дает некоторую новую информацию, поэтому я подумал, что это единственный способ для меня поделиться им с ОП. - person FSAN; 09.06.2020
comment
Ты спасатель! Документы KIP-545 о том, как настроить MirrorMaker 2.0 в каждом из трех различных режимов работы, по меньшей мере, сбивают с толку. Как одна запись конфигурации может привести к такому странному поведению? Это должно, по крайней мере, быть отклонено логикой проверки конфигурации. В любом случае, спасибо вам огромное! - person pederpansen; 14.08.2020
comment
Я могу подтвердить, что удаление этой строки было решением. - person Yaya; 03.03.2021

Включение идемпотентности в конфигурацию клиента решит проблему. По умолчанию будет установлено значение false. Добавьте ниже в файл mm2.properties

source.cluster.producer.enable.idempotence = true
target.cluster.producer.enable.idempotence = true
person mithun kumar    schedule 14.04.2020