Интеграция Spring: вовлекайте в агрегацию все заголовки, а не только последний

У меня есть поток интеграции Spring, определенный следующим образом:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .acknowledgeMode(MANUAL)
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

И активатор службы определяется следующим образом:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator    
    public void myMethod(List<> Collection<MyEvent> events) {
        ....
    }    
}

Я пытаюсь изменить myMethod, чтобы он также принимал список конкретных заголовков, связанных с каждым сообщением в агрегации. В моем случае я хотел бы получить AmqpHeaders.CHANNEL и AmqpHeaders.DELIVERY_TAG для каждого сообщения, чтобы я мог выполнять ACK или NACK для каждого сообщения в RabbitMQ (обратите внимание, что я намеренно использую ручной режим подтверждения в IntegrationFlow, поскольку я хочу для отправки ACK / NACK после myMethod выполнения).

Я пробовал, например, такой подход:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator    
    public void myMethod(@Header(AmqpHeaders.CHANNEL) List<Channel> channels, 
                         @Header(AmqpHeaders.DELIVERY_TAG) List<Long> tags, 
                         Collection<MyEvent> events) {
        ....
    }    
}

но здесь я, кажется, получаю значения заголовка только для последнего сообщения (т.е. channels и tags всегда имеют размер 1, хотя в коллекции events есть несколько событий).

Я также попытался изменить Collection<MyEvent> на Collection<Message> (org.springframework.messaging.Message), чтобы попытаться вручную извлечь заголовки, но это не удалось:

org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: com.x.y.MyEvent cannot be cast to org.springframework.messaging.Message

поскольку сообщение уже преобразовано преобразователем сообщений, определенным в IntegrationFlow.

Как я могу этого добиться?


person Johan    schedule 27.11.2017    source источник


Ответы (1)


Вам понадобится пользовательский outputProcessor на вашем aggregator, чтобы возвращать желаемое сообщение (со списками каналов / тегов доставки в заголовках и списком событий в полезной нагрузке).

person Gary Russell    schedule 27.11.2017