Я использую KafkaIO в потоке данных для чтения сообщений из одной темы. Я использую следующий код.
KafkaIO.<String, String>read()
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 8000).put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000)
.build())
// .commitOffsetsInFinalize()
.withTopics(Collections.singletonList(topicNames))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata();
Я запускаю программу потока данных на своем локальном компьютере с помощью прямого бегуна. Все работает нормально. Я запускаю другой экземпляр той же программы параллельно, то есть другого потребителя. Теперь я вижу повторяющиеся сообщения при обработке конвейера.
Хотя я предоставил идентификатор группы потребителей, запуск другого потребителя с тем же идентификатором группы потребителей (другой экземпляр той же программы) не должен обрабатывать те же элементы, которые обрабатываются другим потребителем, верно?
Как это получается при использовании бегуна потока данных?