У меня есть поток интеграции Spring, определенный следующим образом:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.acknowledgeMode(MANUAL)
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
И активатор службы определяется следующим образом:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(List<> Collection<MyEvent> events) {
....
}
}
Я пытаюсь изменить myMethod
, чтобы он также принимал список конкретных заголовков, связанных с каждым сообщением в агрегации. В моем случае я хотел бы получить AmqpHeaders.CHANNEL
и AmqpHeaders.DELIVERY_TAG
для каждого сообщения, чтобы я мог выполнять ACK или NACK для каждого сообщения в RabbitMQ (обратите внимание, что я намеренно использую ручной режим подтверждения в IntegrationFlow, поскольку я хочу для отправки ACK / NACK после myMethod
выполнения).
Я пробовал, например, такой подход:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(@Header(AmqpHeaders.CHANNEL) List<Channel> channels,
@Header(AmqpHeaders.DELIVERY_TAG) List<Long> tags,
Collection<MyEvent> events) {
....
}
}
но здесь я, кажется, получаю значения заголовка только для последнего сообщения (т.е. channels
и tags
всегда имеют размер 1, хотя в коллекции events
есть несколько событий).
Я также попытался изменить Collection<MyEvent>
на Collection<Message>
(org.springframework.messaging.Message
), чтобы попытаться вручную извлечь заголовки, но это не удалось:
org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: com.x.y.MyEvent cannot be cast to org.springframework.messaging.Message
поскольку сообщение уже преобразовано преобразователем сообщений, определенным в IntegrationFlow.
Как я могу этого добиться?