Обработка системных ошибок Apache Kafka

Мы пытаемся реализовать Kafka в качестве нашего решения для брокера сообщений. Мы развертываем наши микросервисы Spring Boot в IBM BLuemix, внутренняя реализация брокера сообщений которого - Kafka версии 0.10. Поскольку мой опыт больше связан с JMS, ActiveMQ, мне было интересно, каким должен быть идеальный способ обработки ошибок системного уровня у потребителей java?

Вот как мы это реализовали в настоящее время

Потребительские свойства

enable.auto.commit=false
auto.offset.reset=latest

Мы используем свойства по умолчанию для

max.partition.fetch.bytes
session.timeout.ms

Kafka Consumer

Мы запускаем 3 потока для каждой темы с одинаковым groupId, то есть по одному экземпляру KafkaConsumer на поток. На данный момент у нас только один раздел. Код потребителя выглядит так в конструкторе класса потока

kafkaConsumer = new KafkaConsumer<String, String>(properties);

    final List<String> topicList = new ArrayList<String>();
    topicList.add(properties.getTopic());

    kafkaConsumer.subscribe(topicList, new ConsumerRebalanceListener() {

        @Override
        public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
        }

        @Override
        public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
            try {
                logger.info("Partitions assigned, consumer seeking to end.");

                for (final TopicPartition partition : partitions) {
                    final long position = kafkaConsumer.position(partition);
                    logger.info("current Position: " + position);

                    logger.info("Seeking to end...");
                    kafkaConsumer.seekToEnd(Arrays.asList(partition));
                    logger.info("Seek from the current position: " + kafkaConsumer.position(partition));
                    kafkaConsumer.seek(partition, position);
                }
                logger.info("Consumer can now begin consuming messages.");
            } catch (final Exception e) {
                logger.error("Consumer can now begin consuming messages.");
            }

        }
    });  

Фактическое чтение происходит в методе выполнения потока

try {
            // Poll on the Kafka consumer every second.
            final ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);


            // Iterate through all the messages received and print their
            // content.
            for (final TopicPartition partition : records.partitions()) {

                final List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                logger.info("consumer is alive and is processing   "+ partitionRecords.size() +" records");
                for (final ConsumerRecord<String, String> record : partitionRecords) {
                    logger.info("processing topic  "+ record.topic()+" for key "+record.key()+" on offset "+ record.offset());

                    final Class<? extends Event> resourceClass = eventProcessors.getResourceClass();
                    final Object obj = converter.convertToObject(record.value(), resourceClass);
                    if (obj != null) {
                        logger.info("Event: " + obj + " acquired by  " + Thread.currentThread().getName());
                        final CommsEvent event = resourceClass.cast(converter.convertToObject(record.value(), resourceClass));
                        final MessageResults results = eventProcessors.processEvent(event
                                );
                        if ("Success".equals(results.getStatus())) {
                            // commit the processed message which changes
                            // the offset
                            kafkaConsumer.commitSync();
                            logger.info("Message processed sucessfully");
                        } else {
                            kafkaConsumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
                            logger.error("Error processing message : {} with error : {},resetting offset to {} ", obj,results.getError().getMessage(),record.offset());
                            break;
                        }

                    }
                }

            }
            // TODO add return

        } catch (final Exception e) {
            logger.error("Consumer has failed with exception: " + e, e);
            shutdown();
        }  

Вы заметите EventProcessor, который является классом обслуживания, который обрабатывает каждую запись, в большинстве случаев фиксирует запись в базе данных. Если процессор выдает ошибку (системное исключение или ValidationException), мы не фиксируем, а программно устанавливаем поиск на это смещение, так что последующий опрос будет возвращаться из этого смещения для этого идентификатора группы.

Теперь сомнение в том, что это правильный подход? Если мы получим ошибку и зададим смещение, то до тех пор, пока она не будет исправлена, другие сообщения не будут обрабатываться. Это может сработать для системных ошибок, таких как невозможность подключения к БД, но если проблема связана только с этим событием, а не с другими для обработки этой одной записи, мы не сможем обработать любую другую запись. Мы подумали о концепции ErrorTopic, в которой, когда мы получим ошибку, потребитель опубликует это событие в ErrorTopic, а тем временем он продолжит обработку других последующих событий. Но похоже, что мы пытаемся внедрить концепции дизайна JMS (из-за моего предыдущего опыта) в kafka, и может быть лучший способ решить проблему обработки ошибок в kafka. Также повторная обработка из темы ошибки может изменить последовательность сообщений, которая нам не нужна для некоторых сценариев.

Пожалуйста, дайте мне знать, как кто-то справился с этим сценарием в своих проектах в соответствии со стандартами Kafka.

-Тата


person Tatha    schedule 15.05.2017    source источник
comment
Вы используете Message Hub или самостоятельно реализовали брокер Kafka в Bluemix?   -  person ValerieLampkin    schedule 23.05.2017
comment
@ValerieLampkin Мы используем концентратор сообщений   -  person Tatha    schedule 24.05.2017


Ответы (1)


если проблема связана только с этим событием, а не с другими для обработки этой одной записи, мы не сможем обработать любую другую запись

это правильно, и ваше предложение использовать тему ошибки кажется возможным.

Я также заметил, что с вашей обработкой onPartitionsAssigned вы по существу не используете смещение, зафиксированное потребителем, так как вам кажется, что вы всегда будете добиваться до конца.

Если вы хотите перезапустить с последнего успешно зафиксированного смещения, вам не следует выполнять seek

Наконец, я хотел бы отметить, хотя, похоже, вы знаете, что наличие 3 потребителей в одной группе, подписанных на один раздел, означает, что 2 из 3 будут простаивать.

HTH Edo

person Edoardo Comar    schedule 24.05.2017
comment
Спасибо, что изучили это. Да, во время отладки я понял, что мы стремимся к завершению, и в результате, когда приложение было недоступно, и если в течение этого времени были отправлены новые события, ни один потребитель никогда не обработал это. Мы удалили стремление к концу. Кроме того, как вы упомянули, наличие нескольких потоков, указывающих на один раздел для одной и той же группы потребителей, не работает. Мы изменили это на только один поток. - person Tatha; 24.05.2017