Потребители Apache Beam KafkaIO в группе потребителей читают одно и то же сообщение

Я использую KafkaIO в потоке данных для чтения сообщений из одной темы. Я использую следующий код.

KafkaIO.<String, String>read()
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
                .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 8000).put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000)
                .build())
//                .commitOffsetsInFinalize()
                .withTopics(Collections.singletonList(topicNames))
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata();

Я запускаю программу потока данных на своем локальном компьютере с помощью прямого бегуна. Все работает нормально. Я запускаю другой экземпляр той же программы параллельно, то есть другого потребителя. Теперь я вижу повторяющиеся сообщения при обработке конвейера.

Хотя я предоставил идентификатор группы потребителей, запуск другого потребителя с тем же идентификатором группы потребителей (другой экземпляр той же программы) не должен обрабатывать те же элементы, которые обрабатываются другим потребителем, верно?

Как это получается при использовании бегуна потока данных?


person bigbounty    schedule 16.05.2020    source источник


Ответы (1)


Я не думаю, что установленные вами параметры гарантируют неповторяющуюся доставку сообщений по конвейерам.

  • ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG: это флаг для потребителя Kafka, а не для самого конвейера Beam. Похоже, это лучший способ и периодический, поэтому вы все равно можете видеть дубликаты в нескольких конвейерах.

  • withReadCommitted (): это просто означает, что Beam не будет читать незафиксированные сообщения. Опять же, это не предотвратит дублирование в нескольких конвейерах.

См. здесь для протокола, используемого источником луча для определения начальной точки источника Kafka.

Чтобы гарантировать неповторяющуюся доставку, возможно, вам придется читать по разным темам или по разным подпискам.

person chamikara    schedule 18.05.2020
comment
Даже если я столкнулся с этой проблемой, я хочу запустить несколько экземпляров потребителя, читающих из той же темы. Но сообщение доставляется во все запущенные экземпляры. После найденной мной конфигурации отладки к имени группы добавляется уникальный префикс, и каждый экземпляр имеет уникальное имя группы. ex - group.id = Reader-0_offset_consumer_559337182_my_group group.id = Reader-0_offset_consumer_559337345_my_group Таким образом, каждому экземпляру назначен уникальный group.id, и поэтому сообщения доставляются во все экземпляры. - person Aditya; 20.07.2020