Это моя текущая установка:
queue1 и queue2 объединяются вместе с потоком интеграции в channel1:
@Bean
public IntegrationFlow q1f() {
return IntegrationFlows
.from(queue1InboundAdapter())
...
.channel(amqpInputChannel())
.get();
}
@Bean
public IntegrationFlow q2f() {
return IntegrationFlows
.from(queue2InboundAdapter())
...
.channel(amqpInputChannel())
.get();
}
затем все агрегируется, а затем подтверждается после подтверждения агрегированного сообщения с помощью rabbitmq:
@Bean
public IntegrationFlow aggregatingFlow() {
return IntegrationFlows
.from(amqpInputChannel())
.aggregate(...
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeout(TimeUnit.SECONDS.toMillis(10))
.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))
)
.handle(amqpOutboundEndpoint())
.get();
}
@Bean
public AmqpOutboundEndpoint amqpOutboundEndpoint() {
AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());
outboundEndpoint.setConfirmAckChannel(manualAckChannel());
outboundEndpoint.setConfirmCorrelationExpressionString("#root");
outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");
outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key
return outboundEndpoint;
}
ackTemplate()
устанавливается с помощью cf, имеющего springFactory.setPublisherConfirms(true);
.
Проблема, которую я вижу, заключается в том, что раз в 10 дней некоторые сообщения застревают в состоянии unacknowledged
в rabbitmq.
Я предполагаю, что каким-то образом публикация сообщения ждет, пока кролик выполнит PUBLISHER CONFIRMS
, но он никогда не получает его и время ожидания истекает? В этом случае я никогда не отправляю сообщение ACK в queue1
. Это возможно?
Итак, еще раз завершите рабочий процесс:
[две очереди -> прямой канал -> агрегатор (сохраняет значения канала и тега) -> публикация для кролика -> кролик возвращает ACK через подтверждение издателя -> spring подтверждает все сообщения на канале + значения, которые он хранил в памяти для агрегированного сообщения]
У меня также есть реализация агрегатора (так как мне нужно вручную подтверждать сообщения как от q1, так и от q2):
public abstract class AbstractManualAckAggregatingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
public static final String MANUAL_ACK_PAIRS = PREFIX + "manualAckPairs";
private AckingState ackingState;
public AbstractManualAckAggregatingMessageGroupProcessor(AckingState ackingState){
this.ackingState = ackingState;
}
@Override
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
Map<String, Object> aggregatedHeaders = super.aggregateHeaders(group);
List<ManualAckPair> manualAckPairs = new ArrayList<>();
group.getMessages().forEach(m -> {
Channel channel = (Channel)m.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long)m.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
manualAckPairs.add(new ManualAckPair(channel, deliveryTag, ackingState));
});
aggregatedHeaders.put(MANUAL_ACK_PAIRS, manualAckPairs);
return aggregatedHeaders;
}
}
ОБНОВЛЕНИЕ
Так выглядит админ-кролик (2 незапакованных сообщения на долгое время, и оно не будет ACKED до перезапуска - при повторной доставке):
ConnectionFactory
дляackTemplate
, поэтому каналы AMQP не блокируются во время отправки. - person Artem Bilan   schedule 15.02.2019