Аксон 4: EventSourcingHandler не запускается при применении события из другого потока

Я столкнулся с небольшой проблемой с обработкой команд в Axon 4.

Скажем, у меня есть агрегат, которому нужно вызывать внешнюю службу при обработке команды.

Внешняя служба использует асинхронный клиент (vertx tcp client + rxjava), поэтому ответ дается в другом потоке, чем тот, который создал агрегированный экземпляр.

Я хочу применить событие, учитывая результат моей службы, но это не работает, потому что вызов AggregateLifecycle.apply() находится в другом потоке ...

Как я могу «перенести» объем агрегата?

Вот небольшой пример того, что я хочу сделать (использует rxjava 2 и lombok):

Совокупный:

@Slf4j
@Aggregate
@NoArgsConstructor
public class MyAggregate {

    @AggregateIdentifier
    private String id;

    @CommandHandler
    public MyAggregate(CreationCommand creationCommand) {
        Single.just("some data")
                .observeOn(Schedulers.computation()) // <- comment this line and the test pass, uncomment and it fail because apply is on another thread ?
                .subscribe((s, throwable) -> apply(new AggregateCreatedEvent(creationCommand.getId())));
    }

    @EventSourcingHandler
    public void on(AggregateCreatedEvent event) {
        this.id = event.getId();
    }
}

@Value class CreationCommand { String id; }
@Value class AggregateCreatedEvent { String id;}

И тест:

public class MyAggregateTest {

    AggregateTestFixture<MyAggregate> testFixture = new AggregateTestFixture<>(MyAggregate.class);

    @Test
    public void test() {
        testFixture.givenNoPriorActivity()
                .when(new CreationCommand("123"))
                .expectEvents(new AggregateCreatedEvent("123"));
    }
}

Вот ошибка, которую я получил:

java.lang.IllegalStateException: Cannot request current Scope if none is active

person Sylvain    schedule 04.01.2019    source источник
comment
См. Предыдущий ответ автора фреймворка: stackoverflow.com/questions/52937596 / axon-completablefuture /   -  person Mzzl    schedule 05.01.2019
comment
Привет, Mzzl, спасибо за ссылки. Это то, что я сделал, но это не решает мою проблему, так как я хочу, чтобы мой агрегат обновлялся после этого события в том же UnitOfWork / Transaction ... Я пытался отправить общее сообщение о событии, но мне нужен общий Сообщение о событии домена плюс транзакция.   -  person Sylvain    schedule 07.01.2019
comment
Событие должно применяться в потоке, который управляет этой единицей работы, в данном случае CommandHandler. Я понимаю, что вы хотите, чтобы ваше приложение было неблокирующим и асинхронным, но Axon предоставляет для этого свои собственные механизмы. CommandBus принимает команды асинхронно, а события обрабатываются обработчиками событий асинхронно. Агрегатное состояние обновляется EventSourcingHandler. Ничего не получится от асинхронной реализации CommandHandler, и, в любом случае, в настоящее время она не поддерживается.   -  person Mzzl    schedule 07.01.2019
comment
Я думаю, вы можете добавить свой комментарий @Mzzl к фактическому ответу на этот вопрос. Axon предоставляет собственный механизм для асинхронных операций.   -  person Steven    schedule 17.01.2019
comment
хорошо, похоже, у меня проблема с дизайном: я разместил здесь еще один вопрос: stackoverflow.com/questions/54287285/   -  person Sylvain    schedule 21.01.2019


Ответы (1)


Событие должно применяться в потоке, который управляет этой единицей работы, в данном случае CommandHandler. Axon предоставляет собственные механизмы для асинхронных операций. CommandBus принимает команды асинхронно, а события обрабатываются обработчиками событий асинхронно. Ничего не получится от асинхронной реализации CommandHandler, и, в любом случае, в настоящее время она не поддерживается.

Все данные, необходимые для применения события, обычно должны быть доступны в команде или в агрегированном состоянии, а не из дополнительных внешних источников.

Вероятно, это то, что вы хотите, чтобы ваш обработчик команд выглядел так:

@CommandHandler
public MyAggregate(CreationCommand creationCommand) {
    apply(new AggregateCreatedEvent(creationCommand.getId());
}
person Mzzl    schedule 18.01.2019