Я создаю процессор задач, который использует TPL DataFlow. Я буду следовать модели производителя и потребителя, в которой производитель производит некоторые товары, которые время от времени обрабатываются, а потребители продолжают ждать прибытия новых товаров. Вот мой код:
async Task Main()
{
var runner = new Runner();
CancellationTokenSource cts = new CancellationTokenSource();
Task runnerTask = runner.ExecuteAsync(cts.Token);
await Task.WhenAll(runnerTask);
}
public class Runner
{
public async Task ExecuteAsync(CancellationToken cancellationToken) {
var random = new Random();
ActionMeshProcessor processor = new ActionMeshProcessor();
await processor.Init(cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more
int[] items = GetItems(random.Next(3, 7));
await processor.ProcessBlockAsync(items);
}
}
private int[] GetItems(int count)
{
Random randNum = new Random();
int[] arr = new int[count];
for (int i = 0; i < count; i++)
{
arr[i] = randNum.Next(10, 20);
}
return arr;
}
}
public class ActionMeshProcessor
{
private TransformBlock<int, int> Transformer { get; set; }
private ActionBlock<int> CompletionAnnouncer { get; set; }
public async Task Init(CancellationToken cancellationToken)
{
var options = new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = 5,
BoundedCapacity = 5
};
this.Transformer = new TransformBlock<int, int>(async input => {
await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!
if (input > 15)
{
throw new Exception($"I can't handle this number: {input}");
}
return input + 1;
}, options);
this.CompletionAnnouncer = new ActionBlock<int>(async input =>
{
Console.WriteLine($"Completed: {input}");
await Task.FromResult(0);
}, options);
this.Transformer.LinkTo(this.CompletionAnnouncer);
await Task.FromResult(0); // what do I await here?
}
public async Task ProcessBlockAsync(int[] arr)
{
foreach (var item in arr)
{
await this.Transformer.SendAsync(item); // await if there are no free slots
}
}
}
Я добавил проверку условия выше, чтобы создать исключение, имитирующее исключительный случай.
Вот мои вопросы:
Как лучше всего обрабатывать исключения в указанной выше сетке, не разрушая всю сетку?
Есть ли лучший способ инициализировать / запустить / продолжить бесконечную сетку DataFlow?
Где мне ждать завершения?
Я просмотрел этот аналогичный вопрос