Я попытался сохранить экземпляры саги, используя RedisSagaRepository
; Я хотел запустить Saga в настройке балансировки нагрузки, поэтому не могу использовать InMemorySagaRepository
. Однако после переключения я заметил, что некоторые события, опубликованные Consumers, не обрабатывались Saga. Я проверил очередь - сообщений не увидел.
Я заметил, что это скорее всего произойдет, когда Потребителю потребуется совсем немного времени для обработки команды и публикации события. Эта проблема не возникнет, если я использую InMemorySagaRepository
или добавлю Task.Delay()
в Consumer.Consume()
Я неправильно его использую?
Кроме того, если я хочу запустить Saga в настройке балансировки нагрузки, и если Saga необходимо отправить несколько команд одного и того же типа с использованием словаря для отслеживания полноты (аналогичная логика, как в Обработка перехода в состояние для нескольких событий). При одновременной публикации нескольких событий Consumer, будет ли у меня состояние гонки, если две саги обрабатывают два разных события одновременно? В этом случае будет ли правильно настроен объект «Словарь в состоянии»?
Код доступен здесь
SagaService.ConfigureSagaEndPoint()
- это место, где я переключаюсь между InMemorySagaRepository
и RedisSagaRepository
private void ConfigureSagaEndPoint(IRabbitMqReceiveEndpointConfigurator endpointConfigurator)
{
var stateMachine = new MySagaStateMachine();
try
{
var redisConnectionString = "192.168.99.100:6379";
var redis = ConnectionMultiplexer.Connect(redisConnectionString);
///If we switch to RedisSagaRepository and Consumer publish its response too quick,
///It seems like the consumer published event reached Saga instance before the state is updated
///When it happened, Saga will not process the response event because it is not in the "Processing" state
//var repository = new RedisSagaRepository<SagaState>(() => redis.GetDatabase());
var repository = new InMemorySagaRepository<SagaState>();
endpointConfigurator.StateMachineSaga(stateMachine, repository);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
LeafConsumer.Consume - это то место, где мы добавляем Task.Delay ()
public class LeafConsumer : IConsumer<IConsumerRequest>
{
public async Task Consume(ConsumeContext<IConsumerRequest> context)
{
///If MySaga project is using RedisSagaRepository, uncomment await Task.Delay() below
///Otherwise, it seems that the Publish message from Consumer will not be processed
///If using InMemorySagaRepository, code will work without needing Task.Delay
///Maybe I am doing something wrong here with these projects
///Or in real life, we probably have code in Consumer that will take a few milliseconds to complete
///However, we cannot predict latency between Saga and Redis
//await Task.Delay(1000);
Console.WriteLine($"Consuming CorrelationId = {context.Message.CorrelationId}");
await context.Publish<IConsumerProcessed>(new
{
context.Message.CorrelationId,
});
}
}