Интеграция Spring - Издатель подтверждает тайм-аут?

Это моя текущая установка:

queue1 и queue2 объединяются вместе с потоком интеграции в channel1:

@Bean
public IntegrationFlow q1f() {
    return IntegrationFlows
            .from(queue1InboundAdapter())
            ...
            .channel(amqpInputChannel())
            .get();
}

@Bean
public IntegrationFlow q2f() {
    return IntegrationFlows
            .from(queue2InboundAdapter())
            ...
            .channel(amqpInputChannel())
            .get();
}

затем все агрегируется, а затем подтверждается после подтверждения агрегированного сообщения с помощью rabbitmq:

@Bean
    public IntegrationFlow aggregatingFlow() {
        return IntegrationFlows
                .from(amqpInputChannel())
                .aggregate(...
                        .expireGroupsUponCompletion(true)
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(TimeUnit.SECONDS.toMillis(10))
                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))
                )
                .handle(amqpOutboundEndpoint())
                .get();
    }

    @Bean
    public AmqpOutboundEndpoint amqpOutboundEndpoint() {
        AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());
        outboundEndpoint.setConfirmAckChannel(manualAckChannel());
        outboundEndpoint.setConfirmCorrelationExpressionString("#root");
        outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");
        outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key
        return outboundEndpoint;
    }

ackTemplate() устанавливается с помощью cf, имеющего springFactory.setPublisherConfirms(true);.

Проблема, которую я вижу, заключается в том, что раз в 10 дней некоторые сообщения застревают в состоянии unacknowledged в rabbitmq.

Я предполагаю, что каким-то образом публикация сообщения ждет, пока кролик выполнит PUBLISHER CONFIRMS, но он никогда не получает его и время ожидания истекает? В этом случае я никогда не отправляю сообщение ACK в queue1. Это возможно?

Итак, еще раз завершите рабочий процесс:

[две очереди -> прямой канал -> агрегатор (сохраняет значения канала и тега) -> публикация для кролика -> кролик возвращает ACK через подтверждение издателя -> spring подтверждает все сообщения на канале + значения, которые он хранил в памяти для агрегированного сообщения]

У меня также есть реализация агрегатора (так как мне нужно вручную подтверждать сообщения как от q1, так и от q2):

public abstract class AbstractManualAckAggregatingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
    public static final String MANUAL_ACK_PAIRS = PREFIX + "manualAckPairs";
    private AckingState ackingState;

    public AbstractManualAckAggregatingMessageGroupProcessor(AckingState ackingState){
        this.ackingState = ackingState;
    }

    @Override
    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        Map<String, Object> aggregatedHeaders = super.aggregateHeaders(group);
        List<ManualAckPair> manualAckPairs = new ArrayList<>();
        group.getMessages().forEach(m -> {
            Channel channel = (Channel)m.getHeaders().get(AmqpHeaders.CHANNEL);
            Long deliveryTag = (Long)m.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            manualAckPairs.add(new ManualAckPair(channel, deliveryTag, ackingState));
        });
        aggregatedHeaders.put(MANUAL_ACK_PAIRS, manualAckPairs);
        return aggregatedHeaders;
    }
}

ОБНОВЛЕНИЕ

Так выглядит админ-кролик (2 незапакованных сообщения на долгое время, и оно не будет ACKED до перезапуска - при повторной доставке):  введите описание изображения здесь


person Bojan Vukasovic    schedule 15.02.2019    source источник
comment
Похоже, здесь слишком много настраиваемой бизнес-логики. Нам нужен какой-то проект на GitHub, чтобы поиграть и, возможно, воспроизвести. Как можно проще. Итак, пока никаких идей о том, что происходит. Вы можете играть с разными ConnectionFactory для ackTemplate, поэтому каналы AMQP не блокируются во время отправки.   -  person Artem Bilan    schedule 15.02.2019
comment
@ArtemBilan Я загрузил свой полный поток, и вы также можете найти 1 тест, который использует testcontaienrs (для запуска кролика на локальном докере) и показывает, как работает агрегация. Полнопоточная оп. находится по адресу github.com/bojanv55/spring-integration-aggregate-ack/blob/   -  person Bojan Vukasovic    schedule 16.02.2019
comment
@ArtemBilan Я только что создал в этом проекте соединение через toxi-proxy, чтобы имитировать потерю сообщений подтверждения издателя. В этом случае - ACK от сервера, связанный с доставкой, никогда не поступает в Java-приложение, и мое Java-приложение не может вручную подтвердить сообщение в другой очереди. Есть ли возможность дождаться подтверждения издателя и, если он не получен, повторить отправку сообщения?   -  person Bojan Vukasovic    schedule 19.02.2019


Ответы (2)


В Spring AMQP версии 2.1 (Spring Integration 5.1) мы добавили Future<?> и вернули сообщение в CorrelationData, чтобы помочь с подобными вещами. Если вы используете старую версию, вы можете создать подкласс CorrelationData (и вам придется обрабатывать настройку будущего и возвращаемого сообщения в вашем коде).

Это вместе с запланированной задачей может обнаруживать недостающие подтверждения ...

@SpringBootApplication
@EnableScheduling
public class Igh2755Application {

    public static void main(String[] args) {
        SpringApplication.run(Igh2755Application.class, args);
    }

    private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            SuccessCallback<? super Confirm> successCallback = confirm -> {
                System.out.println((confirm.isAck() ? "A" : "Na") + "ck received");
            };
            FailureCallback failureCallback = throwable -> {
                System.out.println(throwable.getMessage());
            };

            // Good - ack
            CorrelationData correlationData = new CorrelationData("good");
            correlationData.getFuture().addCallback(successCallback, failureCallback);
            this.futures.put(correlationData);
            template.convertAndSend("", "foo", "data", correlationData);

            // Missing exchange nack, no return
            correlationData = new CorrelationData("missing exchange");
            correlationData.getFuture().addCallback(successCallback, failureCallback);
            this.futures.put(correlationData);
            template.convertAndSend("missing exchange", "foo", "data", correlationData);

            // Missing queue ack, with return
            correlationData = new CorrelationData("missing queue");
            correlationData.getFuture().addCallback(successCallback, failureCallback);
            this.futures.put(correlationData);
            template.convertAndSend("", "missing queue", "data", correlationData);
        };
    }

    @Scheduled(fixedDelay = 5_000)
    public void checkForMissingAcks() {
        System.out.println("Checking pending acks");
        CorrelationData correlationData = this.futures.poll();
        while (correlationData != null) {
            try {
                if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {
                    if (correlationData.getReturnedMessage() == null) {
                        System.out.println("Ack received OK for " + correlationData.getId());
                    }
                    else {
                        System.out.println("Message returned for " + correlationData.getId());
                    }
                }
                else {
                    System.out.println("Nack received for " + correlationData.getId());
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Interrupted");
            }
            catch (ExecutionException e) {
                System.out.println("Failed to get an ack " + e.getCause().getMessage());
            }
            catch (TimeoutException e) {
                System.out.println("Timed out waiting for ack for " + correlationData.getId());
            }
            correlationData = this.futures.poll();
        }
        System.out.println("No pending acks, exiting");
    }

}

.

Checking pending acks
Ack received OK for good
Nack received for missing exchange
Message returned for missing queue
No pending acks, exiting

В Spring Integration есть confirmCorrelationExpression, который можно использовать для создания экземпляра CorrelationData.

ИЗМЕНИТЬ

С интеграцией Spring ...

@SpringBootApplication
@EnableScheduling
public class Igh2755Application {

    public static void main(String[] args) {
        SpringApplication.run(Igh2755Application.class, args);
    }

    private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();

    public interface Gate {

        void send(@Header("exchange") String exchange, @Header("rk") String rk, String payload);

    }

    @Bean
    @DependsOn("flow")
    public ApplicationRunner runner(Gate gate) {
        return args -> {
            gate.send("", "foo", "good");
            gate.send("junque", "rk", "missing exchange");
            gate.send("", "junque", "missing queue");
        };
    }

    @Bean
    public IntegrationFlow flow(RabbitTemplate template) {
        return IntegrationFlows.from(Gate.class)
                    .handle(Amqp.outboundAdapter(template)
                            .confirmCorrelationExpression("@correlationCreator.create(#root)")
                            .exchangeNameExpression("headers.exchange")
                            .routingKeyExpression("headers.rk")
                            .returnChannel(returns())
                            .confirmAckChannel(acks())
                            .confirmNackChannel(acks()))
                    .get();
    }

    @Bean
    public MessageChannel acks() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel returns() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow ackFlow() {
        return IntegrationFlows.from("acks")
                /*
                 * Work around a bug because the correlation data is wrapped and so the
                 * wrong future is completed.
                 */
                .handle(m -> {
                    System.out.println(m);
                    if (m instanceof ErrorMessage) { // NACK
                        NackedAmqpMessageException nme = (NackedAmqpMessageException) m.getPayload();
                        CorrelationData correlationData = (CorrelationData) nme.getCorrelationData();
                        correlationData.getFuture().set(new Confirm(false, "Message was returned"));
                    }
                    else {
                        ((CorrelationData) m.getPayload()).getFuture().set(new Confirm(true, null));
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow retFlow() {
        return IntegrationFlows.from("returns")
                .handle(System.out::println)
                .get();
    }

    @Bean
    public CorrelationCreator correlationCreator() {
        return new CorrelationCreator(this.futures);
    }

    public static class CorrelationCreator {

        private final BlockingQueue<CorrelationData> futures;

        public CorrelationCreator(BlockingQueue<CorrelationData> futures) {
            this.futures = futures;
        }

        public CorrelationData create(Message<String> message) {
            CorrelationData data = new CorrelationData(message.getPayload());
            this.futures.add(data);
            return data;
        }

    }

    @Scheduled(fixedDelay = 5_000)
    public void checkForMissingAcks() {
        System.out.println("Checking pending acks");
        CorrelationData correlationData = this.futures.poll();
        while (correlationData != null) {
            try {
                if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {
                    if (correlationData.getReturnedMessage() == null
                            && !correlationData.getId().equals("Message was returned")) {
                        System.out.println("Ack received OK for " + correlationData.getId());
                    }
                    else {
                        System.out.println("Message returned for " + correlationData.getId());
                    }
                }
                else {
                    System.out.println("Nack received for " + correlationData.getId());
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Interrupted");
            }
            catch (ExecutionException e) {
                System.out.println("Failed to get an ack " + e.getCause().getMessage());

            }
            catch (TimeoutException e) {
                System.out.println("Timed out waiting for ack for " + correlationData.getId());
            }
            correlationData = this.futures.poll();
        }
        System.out.println("No pending acks, exiting");
    }

}
person Gary Russell    schedule 21.02.2019
comment
Спасибо за ответ. Это выглядит многовато, но это одно из решений. Не могли бы вы помочь мне с реализацией AmqpOutboundEndpoint, поскольку он всегда будет setConfirmAckChannel, а не в этом будущем обратном вызове? Думаю, мне нужно заменить текущий outboundEndpoint.setConfirmCorrelationExpressionString("#root");? - person Bojan Vukasovic; 21.02.2019
comment
На самом деле я не уверен, стоит ли это вообще реализовывать, поскольку кролик не имеет понятия о тайм-аутах при доставке сообщения и ожидании ACK (поэтому из-за этого можно иметь некоторые сообщения в состоянии UNACK-ed до перезапуска клиента ). Здесь несколько помогает интервал биений сердца. С другой стороны, при отправке сообщения ваше решение кажется работающим, но также кажется слишком сложным для реализации. Я предполагаю, что в худшем случае (без вашего кода) сообщения, возможно, не будут доставлены своевременно (вероятность 0,0001%), но в конечном итоге будут доставлены. - person Bojan Vukasovic; 21.02.2019
comment
Если я использую ваш код с AmqpOutboundEndpoint, я получаю «Некуда отправлять подтверждение подтверждения издателя для ...» - person Bojan Vukasovic; 21.02.2019
comment
Это немного больше связано с Spring Integration, но в основном применяются те же концепции. Однако мне пришлось обойти ошибку, заключающуюся в том, что Future<?> не заполняется автоматически фреймворком, поэтому мне пришлось сделать это в потоке ack / nack. Смотрите редактирование. github.com/spring-projects/spring-integration/issues/2759 - person Gary Russell; 21.02.2019
comment
Спасибо. Это кажется слишком сложным, поэтому я повторно использую AmqpOutboundEndpoint и добавлю ((RabbitTemplate)this.amqpTemplate).invoke(t -> { t.send(exchangeName, routingKey, amqpMessage, correlationData); t.waitForConfirmsOrDie(15_000); //15 seconds wait time return true; }); вместо текущего метода отправки. - person Bojan Vukasovic; 25.02.2019
comment
Это, безусловно, проще и нормально, если вы можете позволить себе снижение производительности (блокирование отправляющего потока до получения подтверждения). Вы можете добиться почти того же, просто включив транзакции (с таким же падением производительности). - person Gary Russell; 25.02.2019
comment
Есть ли планы включить это в следующую версию интеграции. Чтобы иметь какое-то свойство на CachingConnection setPublisherConfirmsTimeout? - person Bojan Vukasovic; 25.02.2019
comment
Мы определенно можем изучить это; пожалуйста, откройте проблему github со ссылкой на это обсуждение. - person Gary Russell; 25.02.2019

вы можете объявить соединение как bean

@Bean
public ConnectionFactory createConnectionFactory(){
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("/");
    connectionFactory.setPublisherReturns(true);
    connectionFactory.setPublisherConfirmType(ConfirmType.SIMPLE);
    return connectionFactory;
}

Затем RabbitTemplate как

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setConfirmCallback(callback);
    return rabbitTemplate;
}

Где обратный вызов - это реализация интерфейса ConfirmCallback

а при отправке можно просто дождаться подтверждения

System.out.println("Sending message...");
        rabbitTemplate.convertAndSend(rabbitMQProperties.getEXCHANGENAME(), 
                rabbitMQProperties.getQUEUENAME(), "hello from rabbit");
        rabbitTemplate.waitForConfirms(1);

waitforconfirms займет время в миллисекундах. Я поставил его как 1 для тестирования.

person anand shukla    schedule 17.04.2020