Получать события от AMQP с Axon 4

Я пытаюсь отправлять сообщения через rabbitmq в систему на основе весенней загрузки axon4. Сообщение получено, но события не запускаются. Я очень уверен, что мне не хватает важной части, но до сих пор я не мог этого понять.

Вот соответствующая часть моего application.yml

axon:
    amqp:
        exchange: axon.fanout
        transaction-mode: publisher_ack
    # adding the following lines changed nothing
    eventhandling:
        processors:
            amqpEvents:
                source: in.queue
                mode: subscribing
spring:
    rabbitmq:
        username: rabbit
        password: rabbit

Из документов я обнаружил, что должен создать bean-компонент SpringAMQPMessageSource:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class AxonConfig {

    @Bean
    SpringAMQPMessageSource inputMessageSource(final AMQPMessageConverter messageConverter) {
        return new SpringAMQPMessageSource(messageConverter) {
            @RabbitListener(queues = "in.queue")
            @Override
            public void onMessage(final Message message, final Channel channel) {
                log.debug("received external message: {}, channel: {}", message, channel);
                super.onMessage(message, channel);
            }
        };
    }

}

Если я отправлю сообщение в очередь из административной панели rabbitmq, я вижу журнал:

AxonConfig : received external message: (Body:'[B@13f7aeef(byte[167])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=in.queue, deliveryTag=2, consumerTag=amq.ctag-xi34jwHHA__xjENSteX5Dw, consumerQueue=in.queue]), channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@11703cc8 Shared Rabbit Connection: SimpleConnection@581cb879 [delegate=amqp://[email protected]:5672/, localPort= 58614]

Здесь Агрегат, который должен получать события:

import lombok.extern.slf4j.Slf4j;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import pm.mbo.easyway.api.app.order.commands.ConfirmOrderCommand;
import pm.mbo.easyway.api.app.order.commands.PlaceOrderCommand;
import pm.mbo.easyway.api.app.order.commands.ShipOrderCommand;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;

import static org.axonframework.modelling.command.AggregateLifecycle.apply;

@ProcessingGroup("amqpEvents")
@Slf4j
@Aggregate
public class OrderAggregate {

    @AggregateIdentifier
    private String orderId;
    private boolean orderConfirmed;

    @CommandHandler
    public OrderAggregate(final PlaceOrderCommand command) {
        log.debug("command: {}", command);
        apply(new OrderPlacedEvent(command.getOrderId(), command.getProduct()));
    }

    @CommandHandler
    public void handle(final ConfirmOrderCommand command) {
        log.debug("command: {}", command);
        apply(new OrderConfirmedEvent(orderId));
    }

    @CommandHandler
    public void handle(final ShipOrderCommand command) {
        log.debug("command: {}", command);
        if (!orderConfirmed) {
            throw new IllegalStateException("Cannot ship an order which has not been confirmed yet.");
        }
        apply(new OrderShippedEvent(orderId));
    }

    @EventSourcingHandler
    public void on(final OrderPlacedEvent event) {
        log.debug("event: {}", event);
        this.orderId = event.getOrderId();
        orderConfirmed = false;
    }

    @EventSourcingHandler
    public void on(final OrderConfirmedEvent event) {
        log.debug("event: {}", event);
        orderConfirmed = true;
    }

    @EventSourcingHandler
    public void on(final OrderShippedEvent event) {
        log.debug("event: {}", event);
        orderConfirmed = true;
    }

    protected OrderAggregate() {
    }

}

Таким образом, проблема в том, что сообщения принимаются системой, но никакие события не запускаются. Содержание сообщений кажется неуместным. Что бы я ни отправлял в очередь, я получаю сообщение журнала только от моего метода onMessage.

JavaDoc из SpringAMQPMessageSource говорит следующее:

/**
 * MessageListener implementation that deserializes incoming messages and forwards them to one or more event processors.
 * <p>
 * The SpringAMQPMessageSource must be registered with a Spring MessageListenerContainer and forwards each message
 * to all subscribed processors.
 * <p>
 * Note that the Processors must be subscribed before the MessageListenerContainer is started. Otherwise, messages will
 * be consumed from the AMQP Queue without any processor processing them.
 *
 * @author Allard Buijze
 * @since 3.0
 */

Но до сих пор я не мог понять, где и как его зарегистрировать.

Записи axon.eventhandling в моей конфигурации и @ProcessingGroup ("amqpEvents") в моем Aggregate уже прошли тестирование. Но наличие или отсутствие этих записей вообще не имело значения. Тоже пробовал без режима = подписки.

Точные версии: Spring Boot 2.1.4, Axon 4.1.1, axon-amqp-spring-boot-autoconfigure 4.1.

Любая помощь или подсказки приветствуются.


Обновление 23.04.19:

Я попытался написать свой собственный класс вот так:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.messaging.SubscribableMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

@Slf4j
@Component
public class RabbitMQSpringAMQPMessageSource implements ChannelAwareMessageListener, SubscribableMessageSource<EventMessage<?>> {

    private final List<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArrayList<>();
    private final AMQPMessageConverter messageConverter;

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    @Override
    public Registration subscribe(final Consumer<List<? extends EventMessage<?>>> messageProcessor) {
        eventProcessors.add(messageProcessor);
        log.debug("subscribe to: {}", messageProcessor);
        return () -> eventProcessors.remove(messageProcessor);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {
        log.debug("received external message: {}, channel: {}", message, channel);
        log.debug("eventProcessors: {}", eventProcessors);
        if (!eventProcessors.isEmpty()) {
            messageConverter.readAMQPMessage(message.getBody(), message.getMessageProperties().getHeaders())
                            .ifPresent(event -> eventProcessors.forEach(
                                ep -> ep.accept(Collections.singletonList(event))
                            ));
        }
    }

}

Результат тот же, и теперь журнал доказывает, что обработчики событий просто пусты.

eventProcessors: []

Итак, вопрос в том, как правильно зарегистрировать обработчики событий. Есть ли способ сделать это правильно с помощью Spring?

Обновление2:

Также не повезло с этим:

@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        super(messageConverter);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {

        try {
            final var eventProcessorsField = this.getClass().getSuperclass().getDeclaredField("eventProcessors");
            eventProcessorsField.setAccessible(true);
            final var eventProcessors = (List<Consumer<List<? extends EventMessage<?>>>>) eventProcessorsField.get(this);
            log.debug("eventProcessors: {}", eventProcessors);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }

        log.debug("received message: message={}, channel={}", message, channel);
        super.onMessage(message, channel);
    }

}
axon:
    eventhandling:
        processors:
            amqpEvents:
                source: rabbitMQSpringAMQPMessageSource
                mode: SUBSCRIBING

Его программная регистрация в дополнение к вышесказанному также не помогла:

    @Autowired
    void configure(EventProcessingModule epm,
                   RabbitMQSpringAMQPMessageSource rabbitMessageSource) {
        epm.registerSubscribingEventProcessor("rabbitMQSpringAMQPMessageSource", c -> rabbitMessageSource);
        epm.assignProcessingGroup("amqpEvents", "rabbitMQSpringAMQPMessageSource");// this line also made no difference
    }

Конечно, @ProcessingGroup ("amqpEvents") присутствует в моем классе, который содержит аннотированные методы @EventSourcingHandler.


Обновление 25.4.19:

см. принятый ответ от Алларда. Большое спасибо за то, что указали мне на ошибку, которую я сделал: я пропустил, что EventSourcingHandler не получает сообщения извне. Это для прогнозов. Не для распространения Агрегатов! ups Вот конфигурация / классы, которые сейчас получают события от rabbitmq:

axon:
    eventhandling:
        processors:
            amqpEvents:
                source: rabbitMQSpringAMQPMessageSource
                mode: SUBSCRIBING
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        super(messageConverter);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {
        log.debug("received message: message={}, channel={}", message, channel);
        super.onMessage(message, channel);
    }

}
import lombok.extern.slf4j.Slf4j;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Service;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@ProcessingGroup("amqpEvents")
@Service
public class OrderedProductsEventHandler {

    private final Map<String, OrderedProduct> orderedProducts = new HashMap<>();

    @EventHandler
    public void on(OrderPlacedEvent event) {
        log.debug("event: {}", event);
        String orderId = event.getOrderId();
        orderedProducts.put(orderId, new OrderedProduct(orderId, event.getProduct()));
    }

    @EventHandler
    public void on(OrderConfirmedEvent event) {
        log.debug("event: {}", event);
        orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
            orderedProduct.setOrderConfirmed();
            return orderedProduct;
        });
    }

    @EventHandler
    public void on(OrderShippedEvent event) {
        log.debug("event: {}", event);
        orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
            orderedProduct.setOrderShipped();
            return orderedProduct;
        });
    }

    @QueryHandler
    public List<OrderedProduct> handle(FindAllOrderedProductsQuery query) {
        log.debug("query: {}", query);
        return new ArrayList<>(orderedProducts.values());
    }

}

Конечно, я удалил @ProcessingGroup из своего агрегата.

Мои журналы:

RabbitMQSpringAMQPMessageSource : received message: ... 
OrderedProductsEventHandler : event: OrderShippedEvent...

person Manuel    schedule 20.04.2019    source источник


Ответы (1)


В Axon агрегаты не получают события "извне". Обработчики событий внутри агрегатов (точнее, EventSourcingHandlers) обрабатывают только события, опубликованные тем же агрегатным экземпляром, чтобы он мог восстановить свое предыдущее состояние.

Только внешние обработчики событий, например те, которые обновляют прогнозы, будут получать события из внешних источников.

Чтобы это работало, ваш application.yml должен упоминать имя bean-компонента в качестве источника процессора вместо имени очереди. Итак, в вашем первом примере:

    eventhandling:
        processors:
            amqpEvents:
                source: in.queue
                mode: subscribing

Должно стать:

    eventhandling:
        processors:
            amqpEvents:
                source: inputMessageSource
                mode: subscribing

Но опять же, это работает только для обработчиков событий, определенных для компонентов, а не для агрегатов.

person Allard    schedule 24.04.2019
comment
Спасибо, Аллард! Это помогло. Ошибка с источником уже обнаружена, но изменение имени тоже не помогло. Мой MessageSource уже был правильным, но мне не хватало того, чтобы Aggregates / EventSouringHandler не получали сообщения извне. И если подумать, это имеет смысл. Перенос аннотации @ProcessingGroup (amqpEvents) над классом с помощью методов @EventHandler сразу сработал. - person Manuel; 25.04.2019