Обработка перехода в состояние для нескольких событий

У меня есть MassTransitStateMachine, который управляет процессом, который включает создание нескольких событий.

Как только все события будут выполнены, я хочу, чтобы состояние перешло в фазу «очистки».

Вот соответствующее объявление состояния и функция фильтрации:

        During(ImportingData,
            When(DataImported)
                // When we get a data imported event, mark this source as done. 
                .Then(MarkImportCompletedForLocation),

            When(DataImported, IsAllDataImported)
                // Once all are done, we can transition to cleaning up...
                .Then(CleanUpSources)
                .TransitionTo(CleaningUp)
        );


    ...snip...


    private static bool IsAllDataImported(EventContext<DataImportSagaState, DataImportMappingCompletedEvent> ctx)
    {
        return ctx.Instance.Locations.Values.All(x => x);
    }

Итак, пока состояние - ImportingData, я ожидаю получить несколько событий DataImported. Каждое событие отмечает свое местоположение как выполненное, чтобы метод IsAllDataImported мог определить, следует ли нам перейти к следующему состоянию.

Однако, если два последних события DataImported прибывают в одно и то же время, обработчик для перехода к фазе CleaningUp срабатывает дважды, и я дважды пытаюсь выполнить очистку. .

Я мог решить эту проблему в собственном коде, но ожидал, что конечный автомат справится с этим. Я что-то делаю не так, или мне просто нужно разобраться с разногласиями самостоятельно?


person Robert    schedule 18.04.2016    source источник


Ответы (2)


Решение, предложенное Крисом, не сработает в моей ситуации, потому что у меня прибывает несколько событий одного типа. Мне нужно переходить, только когда все эти события наступят. Конструкция CompositeEvent не работает для этого варианта использования.

Моим решением было вызвать новое событие AllDataImported во время метода MarkImportCompletedForLocation. Этот метод теперь определяет, завершены ли все субимпорты, в потокобезопасном режиме.

Итак, мое определение конечного автомата:

            During(ImportingData,
            When(DataImported)
                // When we get a data imported event, mark the URI in the locations list as done. 
                .Then(MarkImportCompletedForLocation),

            When(AllDataImported)
                // Once all are done, we can transition to cleaning up...
                .TransitionTo(CleaningUp)
                .Then(CleanUpSources)
        );

Метод IsAllDataImported больше не нужен в качестве фильтра.

Состояние саги имеет свойство Locations:

public Dictionary<Uri, bool> Locations { get; set; }

А метод MarkImportCompletedForLocation определяется следующим образом:

    private void MarkImportCompletedForLocation(BehaviorContext<DataImportSagaState, DataImportedEvent> ctx)
    {
        lock (ctx.Instance.Locations)
        {
            ctx.Instance.Locations[ctx.Data.ImportSource] = true;
            if (ctx.Instance.Locations.Values.All(x => x))
            {
                var allDataImported = new AllDataImportedEvent {CorrelationId = ctx.Instance.CorrelationId};
                this.CreateEventLift(AllDataImported).Raise(ctx.Instance, allDataImported);
            }
        }
    }

(Я только что написал это, чтобы понять, как будет работать общий поток; я понимаю, что метод MarkImportCompletedForLocation должен быть более защитным, проверяя, существуют ли ключи в словаре.)

person Robert    schedule 18.04.2016
comment
Интересное решение, я сам сталкиваюсь с аналогичной проблемой, когда мне нужно отслеживать выполнение ряда событий одного типа. Как вам удалось сохранить коллекцию словарей до состояния саги? Я использую расширения EntityFramework для сохранения состояния саги, но это не поддерживает хранение словарей. Вы используете библиотеку расширений NHibernate? - person simon_d; 11.07.2016
comment
Извините @simon_d Я только что увидел ваш комментарий. Я использую некоторые уловки, с помощью которых я сохраняю состояние моей саги как структуру JSON в моем репозитории. Это означает, что мне не нужно беспокоиться об изменении схемы БД, когда моя структура состояния изменяется (например, когда я обновляю свое приложение). Таким образом, свойство типа словаря сериализуется / десериализуется без каких-либо проблем. - person Robert; 09.12.2016

Вы можете использовать составное событие для накопления нескольких событий в последующее событие, которое срабатывает при срабатывании зависимых событий. Это определяется с помощью:

CompositeEvent(() => AllDataImported, x => x.ImportStatus, DataImported, MoreDataImported);

During(ImportingData,
    When(DataImported)
        .Then(context => { do something with data }),
    When(MoreDataImported)
        .Then(context => { do smoething with more data}),
    When(AllDataImported)
        .Then(context => { okay, have all data now}));

Затем в вашем экземпляре состояния конечного автомата:

class DataImportSagaState :
    SagaStateMachineInstance
{
    public int ImportStatus { get; set; }
}

Это должно решить проблему, которую вы пытаетесь решить, так что попробуйте. Обратите внимание, что порядок событий не имеет значения, они могут поступать в любом порядке, поскольку состояние того, какие события были получены, находится в свойстве ImportStatus экземпляра.

Данные отдельных событий не сохраняются, поэтому вам нужно будет записать их в экземпляр состояния самостоятельно, используя .Then() методы.

person Chris Patterson    schedule 18.04.2016
comment
Привет, спасибо за ответ. Мне интересно, действительно ли это решит мою проблему. Ваш пример, похоже, работает с несколькими событиями разных типов, тогда как у меня есть несколько экземпляров событий одного типа. - person Robert; 18.04.2016
comment
Импорт данных делится между несколькими обработчиками, каждый из которых будет отвечать за подмножество общих данных, подлежащих импорту. По мере завершения каждого этапа мне нужно решить, уместно ли переходить к следующему этапу. Таким образом, каждый обработчик будет запускать один и тот же тип события, но с другим идентификатором. - person Robert; 18.04.2016
comment
Ах, ну да, именно здесь вам нужно отслеживать, что вы получили сами, поскольку вы, вероятно, также будете иметь дело с заказами и другими вещами. Не думаю, что это встроенный стиль работы. Однако вы можете опубликовать другое событие для себя, когда закончите. - person Chris Patterson; 18.04.2016
comment
Да, порядок в данном случае не важен, но я понимаю, о чем вы говорите. Я отправлю свое решение как отдельный ответ, чтобы помочь всем, у кого есть этот вопрос. - person Robert; 19.04.2016