Как сохранить экземпляры Saga с помощью механизмов хранения и избежать состояния гонки

Я попытался сохранить экземпляры саги, используя RedisSagaRepository; Я хотел запустить Saga в настройке балансировки нагрузки, поэтому не могу использовать InMemorySagaRepository. Однако после переключения я заметил, что некоторые события, опубликованные Consumers, не обрабатывались Saga. Я проверил очередь - сообщений не увидел.

Я заметил, что это скорее всего произойдет, когда Потребителю потребуется совсем немного времени для обработки команды и публикации события. Эта проблема не возникнет, если я использую InMemorySagaRepository или добавлю Task.Delay() в Consumer.Consume()

Я неправильно его использую?

Кроме того, если я хочу запустить Saga в настройке балансировки нагрузки, и если Saga необходимо отправить несколько команд одного и того же типа с использованием словаря для отслеживания полноты (аналогичная логика, как в Обработка перехода в состояние для нескольких событий). При одновременной публикации нескольких событий Consumer, будет ли у меня состояние гонки, если две саги обрабатывают два разных события одновременно? В этом случае будет ли правильно настроен объект «Словарь в состоянии»?

Код доступен здесь

SagaService.ConfigureSagaEndPoint() - это место, где я переключаюсь между InMemorySagaRepository и RedisSagaRepository

private void ConfigureSagaEndPoint(IRabbitMqReceiveEndpointConfigurator endpointConfigurator)
{
    var stateMachine = new MySagaStateMachine();

    try

    {
        var redisConnectionString = "192.168.99.100:6379";
        var redis = ConnectionMultiplexer.Connect(redisConnectionString);

        ///If we switch to RedisSagaRepository and Consumer publish its response too quick,
        ///It seems like the consumer published event reached Saga instance before the state is updated
        ///When it happened, Saga will not process the response event because it is not in the "Processing" state
        //var repository = new RedisSagaRepository<SagaState>(() => redis.GetDatabase());
        var repository = new InMemorySagaRepository<SagaState>();

        endpointConfigurator.StateMachineSaga(stateMachine, repository);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
}

LeafConsumer.Consume - это то место, где мы добавляем Task.Delay ()

public class LeafConsumer : IConsumer<IConsumerRequest>
{
    public async Task Consume(ConsumeContext<IConsumerRequest> context)
    {
        ///If MySaga project is using RedisSagaRepository, uncomment await Task.Delay() below
        ///Otherwise, it seems that the Publish message from Consumer will not be processed
        ///If using InMemorySagaRepository, code will work without needing Task.Delay
        ///Maybe I am doing something wrong here with these projects
        ///Or in real life, we probably have code in Consumer that will take a few milliseconds to complete
        ///However, we cannot predict latency between Saga and Redis
        //await Task.Delay(1000);

        Console.WriteLine($"Consuming CorrelationId = {context.Message.CorrelationId}");
        await context.Publish<IConsumerProcessed>(new
        {
            context.Message.CorrelationId,
        });
    }
}

person Wit B    schedule 29.08.2019    source источник


Ответы (1)


Если у вас есть события, опубликованные таким образом, и вы используете несколько экземпляров службы с нетранзакционным репозиторием саги (например, Redis), вам необходимо разработать свою сагу так, чтобы уникальный идентификатор использовался и обеспечивался Redis. Это предотвращает создание нескольких экземпляров одной и той же саги.

Вам также необходимо принять события в более чем «ожидаемом» состоянии. Например, ожидание получения Start, которое переводит сагу в состояние обработки перед получением другого события только в обработке, скорее всего, не удастся. Рекомендуется разрешить запуск саги (изначально в Automatonymous) любой последовательностью событий, чтобы избежать проблем с доставкой сообщений вне очереди. Пока все события перемещают циферблат слева направо, конечное состояние будет достигнуто. Если более раннее событие получено после более позднего события, оно не должно перемещать состояние назад (или влево, в этом примере), а только добавлять информацию в экземпляр саги и оставлять его в более позднем состоянии.

Если два события обрабатываются в отдельных экземплярах службы, они оба попытаются вставить экземпляр саги в Redis, что приведет к ошибке как дубликат. Затем сообщение следует повторить (добавьте UseMessageRetry () в конечную точку приема), которая затем выберет существующий экземпляр саги и применит событие.

person Chris Patterson    schedule 29.08.2019
comment
Спасибо @chris, предложение принять события в более чем ожидаемом состоянии решило мою проблему; очень признателен. Сможете ли вы указать мне правильное направление при разработке вашей саги, чтобы Redis использовал уникальный идентификатор и обеспечивал его соблюдение? Любые ссылки или поисковые запросы были бы замечательными. На самом деле, любое дополнительное чтение, которое мне следовало бы сделать, было бы очень полезно. Спасибо! - person Wit B; 30.08.2019
comment
Документы по Redis и параллелизму: masstransit-project.com/MassTransit/advanced/ саги / - person Chris Patterson; 30.08.2019