KafkaStream не получает сообщений от темы

Я играю с KafkaStreams и KafkaConnect, просто пытаюсь получить сообщения из темы. У меня есть "стандартный" пакетный потребитель, настроенный для этой темы, и он отлично работает. Сначала я отправляю пару записей в Kafka, а потом использую их. Теперь я хочу сделать то же самое с потоками Kakfa, но не получаю ни одного сообщения из темы. Вот код потребителя, который я использую.

final int NUMBER_OF_PARTITIONS = 4;
final Properties consumerConfig = new Properties();
consumerConfig.setProperty("zookeeper.connect", RULE.getConfiguration().kafka.getZookeeperUrl());
consumerConfig.setProperty("backoff.increment.ms", "100");
consumerConfig.setProperty("group.id", "java-consumer-example");
consumerConfig.setProperty("consumer.timeout.ms", "1000000");
consumerConfig.setProperty("client.id", "someclient");
consumerConfig.setProperty("auto.offset.reset", "smallest");
consumerConfig.setProperty("enable.auto.commit", "false");
consumerConfig.setProperty("bootstrap.servers", RULE.getConfiguration().kafka.getHosts());

final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig));
final TopicFilter sourceTopicFilter = new Whitelist(RULE.getConfiguration().kafka.getTopic());

final VerifiableProperties decoderProps = new VerifiableProperties();
decoderProps.props().setProperty("schema.registry.url", RULE.getConfiguration().kafka.getRegistry());
decoderProps.props().setProperty("max.schemas.per.subject", "1");
final List<KafkaStream<String, Object>> streams = connector
    .createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS, new StringDecoder(decoderProps), new KafkaAvroDecoder(decoderProps));

final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PARTITIONS);
for (final KafkaStream stream : streams) {
    executorService.submit(() -> {
        try {
            final ConsumerIterator it = stream.iterator();
            while (it.hasNext()) {
                final MessageAndMetadata messageAndMetadata = it.next();
                final String key = (String) messageAndMetadata.key();
                System.out.println("KEY" + key);
            }
        } catch (final Exception ex) {
            LOGGER.error("ERROR", ex);
        }
    });
}

Моя проблема в том, что мой код продолжает ждать в состоянии it.hasNext(), пока не истечет время ожидания. Я, наверное, упускаю здесь какие-то детали, но не могу понять, почему я ничего не понимаю по теме. В рамках этого теста у меня есть производитель, который отправляет несколько записей в эту тему прямо перед запуском потребителя, поэтому это не может быть проблемой смещения. Любые идеи приветствуются.


person Patze    schedule 17.11.2016    source источник


Ответы (1)


Я нашел решение. Ошибка была вне кода, который я опубликовал. Тайм-аут, который я предоставил для завершения работы ExecutorService, был слишком коротким, поэтому он просто отключил его, не предоставив достаточно времени для выполнения работы потребителя.

person Patze    schedule 17.11.2016