Как обрабатывать ошибки после того, как сообщение было передано в QueueChannel?

У меня есть 10 очередей rabbitMQ, называемых event.q.0, event.q.2, ‹...>, event.q.9. Каждая из этих очередей получает сообщения, перенаправленные из обмена event.consistent-hash. Я хочу создать отказоустойчивое решение, которое будет последовательно использовать сообщения для определенного события, поскольку порядок важен. Для этого я настроил поток, который прослушивает эти очереди и направляет сообщения на основе идентификатора события в определенный рабочий поток. Рабочие потоки работают на основе каналов очереди, что должно гарантировать порядок FIFO для события с определенным идентификатором. Я придумал следующую настройку:

@Bean
public IntegrationFlow eventConsumerFlow(RabbitTemplate rabbitTemplate, Advice retryAdvice) {
    return IntegrationFlows
            .from(
                    Amqp.inboundAdapter(new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory()))
                            .configureContainer(c -> c
                                    .adviceChain(retryAdvice())
                                    .addQueueNames(queueNames)
                                    .prefetchCount(amqpProperties.getPreMatch().getDefinition().getQueues().getEvent().getPrefetch())
                            )
                            .messageConverter(rabbitTemplate.getMessageConverter())
            )
            .<Event, String>route(e -> String.format("worker-input-%d", e.getId() % numberOfWorkers))
            .get();
}

private Advice deadLetterAdvice() {
    return RetryInterceptorBuilder
            .stateless()
            .maxAttempts(3)
            .recoverer(recoverer())
            .backOffPolicy(backOffPolicy())
            .build();
}

private ExponentialBackOffPolicy backOffPolicy() {
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);
    backOffPolicy.setMultiplier(3.0);
    backOffPolicy.setMaxInterval(15000);

    return backOffPolicy;
}

private MessageRecoverer recoverer() {
    return new RepublishMessageRecoverer(
            rabbitTemplate,
            "error.exchange.dlx"
    );
}

@PostConstruct
public void init() {
    for (int i = 0; i < numberOfWorkers; i++) {
        flowContext.registration(workerFlow(MessageChannels.queue(String.format("worker-input-%d", i), queueCapacity).get()))
                .autoStartup(false)
                .id(String.format("worker-flow-%d", i))
                .register();
    }
}

private IntegrationFlow workerFlow(QueueChannel channel) {
    return IntegrationFlows
        .from(channel)
        .<Object, Class<?>>route(Object::getClass, m -> m
                .resolutionRequired(true)
                .defaultOutputToParentFlow()
                .subFlowMapping(EventOne.class, s -> s.handle(oneHandler))
                .subFlowMapping(EventTwo.class, s -> s.handle(anotherHandler))
        )
        .get();
}

Теперь, когда, скажем, происходит ошибка в eventConsumerFlow, механизм повтора работает, как и ожидалось, но когда ошибка происходит в workerFlow, повторная попытка больше не работает, и сообщение не отправляется в обмен недоставленными сообщениями. Я предполагаю, что это связано с тем, что как только сообщение передается в QueueChannel, оно автоматически подтверждается. Как сделать так, чтобы механизм повтора работал и в workerFlow, чтобы, если там произойдет исключение, он мог повторить попытку пару раз и отправить сообщение в DLX, когда попытки исчерпаны?


person Eduardas Kazakas    schedule 30.01.2019    source источник


Ответы (1)


Если вам нужна отказоустойчивость, вам вообще не следует использовать каналы очереди; сообщения будут подтверждены сразу после того, как сообщение будет помещено в очередь в памяти; в случае сбоя сервера эти сообщения будут потеряны.

Вам следует настроить отдельный адаптер для каждой очереди, если вы не хотите потери сообщений.

Тем не менее, чтобы ответить на общий вопрос, любые ошибки в нисходящих потоках (в том числе после канала очереди) будут отправлены на errorChannel, определенный на входящем адаптере.

person Gary Russell    schedule 30.01.2019
comment
Спасибо, Гэри, это то, о чем я подумал, поскольку в документации четко написано, что если сообщение передается другому потоку или помещается в очередь в памяти, оно автоматически подтверждается, когда режим подтверждения установлен на AUTO. Я подумал, может быть, есть другой способ обойти это. но похоже, что единственным способом будет поток/входящий адаптер для каждой очереди. - person Eduardas Kazakas; 31.01.2019
comment
Вы можете использовать РУЧНОЙ режим подтверждения (и подтверждать или отклонять сообщения, чтобы отправить их на DLX), но вы можете в конечном итоге зависнуть от одного или нескольких процессов, потому что счетчик предварительной выборки применяется ко всем очередям (с SimpleMessageListenerContainer; с DIrectMessageListenerContainer, потребитель (или потребители) выделены для каждой очереди, поэтому счетчик предварительной выборки не будет проблемой. - person Gary Russell; 31.01.2019
comment
Я попробовал этот подход, и он работает нормально, если во время преобразования сообщений нет ошибок. Если в messageConverter есть исключение, поток не добавляет дополнительные заголовки, такие как amqp_channel и amqp_deliveryTag. Я мог бы обойти это, выполнив преобразование сообщений в преобразователе. - person Eduardas Kazakas; 31.01.2019
comment
Ну, ошибка преобразования сообщения возникает до того, как появится errr..., Message<?> - поскольку эти заголовки добавляются к сообщению преобразователем, есть ловушка-22 - сообщения еще нет. Мы могли бы, я полагаю, добавить эти заголовки непосредственно в ErrorMessage (поскольку свойства failedMessage нет). Откройте проблему GitHub, и мы посмотрим, если вы хотите, чтобы мы изучить его. - person Gary Russell; 31.01.2019