Мне нужно построить конвейер потока данных TPL, который будет обрабатывать множество сообщений. Поскольку сообщений много, я не могу просто Post
их поместить в бесконечную очередь BufferBlock
, иначе я столкнусь с проблемами памяти. Поэтому я хочу использовать параметр BoundedCapacity = 1
для отключения очереди и использовать MaxDegreeOfParallelism
для использования параллельной обработки задач, поскольку моим TransformBlock
s может потребоваться некоторое время для каждого сообщения. Я также использую PropagateCompletion
, чтобы завершить все и не распространять по конвейеру.
Но я столкнулся с проблемой обработки ошибок, когда ошибка произошла сразу после первого сообщения: вызов await SendAsync
просто переключает мое приложение в бесконечное ожидание.
Я упростил свой случай до примера консольного приложения:
var data_buffer = new BufferBlock<int>(new DataflowBlockOptions
{
BoundedCapacity = 1
});
var process_block = new ActionBlock<int>(x =>
{
throw new InvalidOperationException();
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2,
BoundedCapacity = 1
});
data_buffer.LinkTo(process_block,
new DataflowLinkOptions { PropagateCompletion = true });
for (var k = 1; k <= 5; k++)
{
await data_buffer.SendAsync(k);
Console.WriteLine("Send: {0}", k);
}
data_buffer.Complete();
await process_block.Completion;
BoundedCapacity
меньшим, чемMaxDegreeOfParallelism
, снизит степень параллелизма до значения емкости. Другими словами, блок не может обрабатывать 2 элемента одновременно, если ему разрешено буферизовать только один. Я считаю, что это происходит потому, что после обработки двух элементов он должен сохранить два результата в своем выходном буфере, и у него нет доступного места для двух результатов. - person Theodor Zoulias   schedule 08.06.2020ActionBlock
, то да, это имело бы смысл, потому что у этого блока есть только входная очередь без выхода. Но на самом деле дажеActionBlock
s по какой-то причине подчиняются одному и тому же правилу. Наверное, для последовательности. - person Theodor Zoulias   schedule 10.06.2020