У меня есть конечный автомат саги MassTransit (полученный из Automatonymous.MassTransitStateMachine), и я пытаюсь обойти проблему, которая проявляется только тогда, когда я устанавливаю prefetchCount конфигурации конечной точки на значение больше 1.
Проблема в том, что событие StartupCompletedEvent публикуется, а затем сразу обрабатывается, прежде чем состояние саги будет сохранено в базе данных.
Конечный автомат настроен следующим образом:
State(() => Initialising);
State(() => StartingUp);
State(() => GeneratingFiles);
Event(() => Requested, x => x.CorrelateById(ctx => ctx.Message.CorrelationId).SelectId(ctx => ctx.Message.CorrelationId));
Event(() => StartupCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => InitialisationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => FileGenerationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Initially(
When(Requested)
.ThenAsync(async ctx =>
{
Console.WriteLine("Starting up...");
await ctx.Publish(new StartupCompletedEvent() { CorrelationId = ctx.Instance.CorrelationId }));
Console.WriteLine("Done starting up...");
}
.TransitionTo(StartingUp)
);
During(StartingUp,
When(StartupCompleted)
.ThenAsync(InitialiseSagaInstanceData)
.TransitionTo(Initialising)
);
// snip...!
Что происходит, когда моя сага получает событие Requested:
- Обработчик ThenAsync блока Initially получает попадание. На этом этапе данные саги не сохраняются в репо (как и ожидалось).
- StartupCompletedEvent публикуется в шине. Здесь также не хранятся данные саги в репо.
- Блок ThenAsync объявления Initially завершается. После этого данные саги наконец сохраняются.
- Больше ничего не происходит.
На данный момент в очереди нет сообщений, и событие StartupCompletedEvent потеряно. Однако в базе есть экземпляр саги.
Я поигрался с запуском и определил, что один из других потоков (поскольку моя предварительная выборка> 1) уловил событие, не нашел никакой саги с correlationId в базе данных и отбросил событие. Таким образом, событие публикуется и обрабатывается до того, как сага сможет сохраниться.
Если я добавлю в обработчик Initially следующее:
When(StartupCompleted)
.Then(ctx => Console.WriteLine("Got the startup completed event when there is no saga instance"))
Затем я получаю выполнение Console.WriteLine. Насколько я понимаю, событие было получено, но перенаправлено в обработчик Initially, поскольку с correlationId не существует саги. Если я поставлю точку останова на этом этапе и проверю репозиторий саги, саги еще нет.
Возможно, стоит упомянуть еще несколько моментов:
- У меня установлен контекст репозитория саги для использования IsolationLevel.Serializable
- Я использую EntityFrameworkSagaRepository
- Все работает, как ожидалось, когда счетчик предварительной выборки установлен на 1
- Я использую Ninject для DI, а мой SagaRepository имеет область видимости потоков, поэтому я предполагаю, что каждый обработчик, который разрешает счетчик предварительной выборки, имеет свою собственную копию репозитория саги.
- Если я опубликую StartupCompletedEvent в отдельном потоке со спящим 1000 мс перед ним, все будет работать правильно. Я предполагаю, что это связано с тем, что репо саги завершило сохранение состояния саги, поэтому, когда событие в конечном итоге публикуется и принимается обработчиком, состояние саги извлекается из репо правильно.
Пожалуйста, дайте мне знать, если я что-то упустил; Я попытался предоставить все, что считаю целесообразным, не затягивая этот вопрос ...