Сага о MassTransit с предварительной выборкой ›1

У меня есть конечный автомат саги MassTransit (полученный из Automatonymous.MassTransitStateMachine), и я пытаюсь обойти проблему, которая проявляется только тогда, когда я устанавливаю prefetchCount конфигурации конечной точки на значение больше 1.

Проблема в том, что событие StartupCompletedEvent публикуется, а затем сразу обрабатывается, прежде чем состояние саги будет сохранено в базе данных.

Конечный автомат настроен следующим образом:

State(() => Initialising);
State(() => StartingUp);
State(() => GeneratingFiles);

Event(() => Requested, x => x.CorrelateById(ctx => ctx.Message.CorrelationId).SelectId(ctx => ctx.Message.CorrelationId));
Event(() => StartupCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => InitialisationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => FileGenerationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));


Initially(
    When(Requested)
        .ThenAsync(async ctx => 
        {
          Console.WriteLine("Starting up...");
          await ctx.Publish(new StartupCompletedEvent() { CorrelationId = ctx.Instance.CorrelationId }));
          Console.WriteLine("Done starting up...");
        }
        .TransitionTo(StartingUp)
);


During(StartingUp,
    When(StartupCompleted)
        .ThenAsync(InitialiseSagaInstanceData)
        .TransitionTo(Initialising)
);

// snip...!

Что происходит, когда моя сага получает событие Requested:

  1. Обработчик ThenAsync блока Initially получает попадание. На этом этапе данные саги не сохраняются в репо (как и ожидалось).
  2. StartupCompletedEvent публикуется в шине. Здесь также не хранятся данные саги в репо.
  3. Блок ThenAsync объявления Initially завершается. После этого данные саги наконец сохраняются.
  4. Больше ничего не происходит.

На данный момент в очереди нет сообщений, и событие StartupCompletedEvent потеряно. Однако в базе есть экземпляр саги.

Я поигрался с запуском и определил, что один из других потоков (поскольку моя предварительная выборка> 1) уловил событие, не нашел никакой саги с correlationId в базе данных и отбросил событие. Таким образом, событие публикуется и обрабатывается до того, как сага сможет сохраниться.

Если я добавлю в обработчик Initially следующее:

When(StartupCompleted)
    .Then(ctx => Console.WriteLine("Got the startup completed event when there is no saga instance"))

Затем я получаю выполнение Console.WriteLine. Насколько я понимаю, событие было получено, но перенаправлено в обработчик Initially, поскольку с correlationId не существует саги. Если я поставлю точку останова на этом этапе и проверю репозиторий саги, саги еще нет.

Возможно, стоит упомянуть еще несколько моментов:

  1. У меня установлен контекст репозитория саги для использования IsolationLevel.Serializable
  2. Я использую EntityFrameworkSagaRepository
  3. Все работает, как ожидалось, когда счетчик предварительной выборки установлен на 1
  4. Я использую Ninject для DI, а мой SagaRepository имеет область видимости потоков, поэтому я предполагаю, что каждый обработчик, который разрешает счетчик предварительной выборки, имеет свою собственную копию репозитория саги.
  5. Если я опубликую StartupCompletedEvent в отдельном потоке со спящим 1000 мс перед ним, все будет работать правильно. Я предполагаю, что это связано с тем, что репо саги завершило сохранение состояния саги, поэтому, когда событие в конечном итоге публикуется и принимается обработчиком, состояние саги извлекается из репо правильно.

Пожалуйста, дайте мне знать, если я что-то упустил; Я попытался предоставить все, что считаю целесообразным, не затягивая этот вопрос ...


person Robert    schedule 02.08.2016    source источник
comment
Я нашел заявление о MT3, в котором утверждается, что события проводятся до этапа сохранения (groups.google.com/d/msg/masstransit-discuss/Jom6ns5jF-w/). Однако я совсем не наблюдаю такого поведения. Пользуюсь МТ 3.3.5.   -  person Robert    schedule 03.08.2016
comment
Вы должны включить InMemoryOutbox (), если хотите отложить публикацию до тех пор, пока сага не будет сохранена.   -  person Chris Patterson    schedule 03.08.2016
comment
@ChrisPatterson Спасибо - похоже, это исправило. Не могли бы вы пояснить, почему это не настройка по умолчанию? Я не могу представить себе ситуацию, когда вы не захотели бы такого поведения ...   -  person Robert    schedule 04.08.2016
comment
Это компонент промежуточного программного обеспечения, и он требуется не всем. Итак, он включен и добавлен в конвейер.   -  person Chris Patterson    schedule 04.08.2016
comment
Я понимаю, что с точки зрения фреймворка это промежуточный компонент. Но с точки зрения разработчика, использующего фреймворк, я потратил на это много времени и усилий; это не является явным ни в одной документации, и, потратив некоторое время на просмотр исходного кода MT, мне пришлось обратиться к вопросу SO. Я до сих пор не понимаю, в какой ситуации вам действительно не нужно такое поведение.   -  person Robert    schedule 04.08.2016
comment
Он используется только для потребителей, которые публикуют другие сообщения и делают некоторые другие (транзакционные) вещи, например, сохраняют что-то. Но само промежуточное ПО уже включено в конвейер. Таким образом, установка его по умолчанию приведет к бесполезному промежуточному программному обеспечению в конвейере. Я определенно согласен с тем, что это должно быть в документации.   -  person Alexey Zimarev    schedule 17.08.2016


Ответы (1)


У меня тоже была эта проблема, и я хотел бы опубликовать комментарий Криса в качестве ответа, чтобы люди могли его найти.

Решение состоит в том, чтобы включить папку «Исходящие», чтобы сообщения сохранялись до тех пор, пока сага не будет сохранена.

c.ReceiveEndpoint("queue", e =>
{
    e.UseInMemoryOutbox();
    // other endpoint configuration here
}
person Alexey Zimarev    schedule 17.08.2016