Как объединить сообщения из канала очереди с помощью Spring Integration DSL?

я определяю канал очереди

@Bean("mail-action-laundry-list-channel")
public MessageChannel mailRecipientActionMessageChannel() {
    return new QueueChannel(20);
    }

В приведенном ниже потоке я буду добавлять сообщения из канала очереди, я пробовал это:

@Bean
public IntegrationFlow mailRecipientActionLaundryListMessageFlow(@Qualifier("laundryListMessageHandler") MessageHandler laundryListMessageHandler) {
    return IntegrationFlows.from("mail-action-laundry-list-channel")
            .log("--> laundry list messages::")
            .aggregate(aggregatorSpec -> aggregatorSpec
                    .correlationExpression("#this.payload.email")
                    .releaseExpression("#this.size() == 5")
                    .messageStore(new SimpleMessageStore(100))
                    .groupTimeout(2000))
            .transform(laundryListMessageToItemProcessDtoTransformer())
            .handle(laundryListMessageHandler)
            .get();
}

но почему он всегда объединяет первые 5 сообщений с канала, а другое сообщение больше не агрегирует


person steven    schedule 23.03.2018    source источник


Ответы (1)


На агрегаторе нужно настроить expireGroupsUponCompletion(true):

Если задано значение true (по умолчанию false), завершенные группы удаляются из хранилища сообщений, что позволяет последующим сообщениям с той же корреляцией сформировать новую группу. Поведение по умолчанию - отправлять сообщения с той же корреляцией, что и завершенная группа, на канал сброса.

Похоже, ваши последующие сообщения из очереди имеют такое же свойство email. Следовательно, агрегатор не может сформировать новую группу для того же ключа корреляции.

https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/messaging-routing-chapter.html#aggregator-config.

person Artem Bilan    schedule 23.03.2018
comment
да вы правы, следующая группа установит такой же ключ. вы отвечаете, я попробую. спасибо - person steven; 26.03.2018