Исключение потока данных TPL в блоке преобразования с ограниченной емкостью

Мне нужно построить конвейер потока данных TPL, который будет обрабатывать множество сообщений. Поскольку сообщений много, я не могу просто Post их поместить в бесконечную очередь BufferBlock, иначе я столкнусь с проблемами памяти. Поэтому я хочу использовать параметр BoundedCapacity = 1 для отключения очереди и использовать MaxDegreeOfParallelism для использования параллельной обработки задач, поскольку моим TransformBlocks может потребоваться некоторое время для каждого сообщения. Я также использую PropagateCompletion, чтобы завершить все и не распространять по конвейеру.

Но я столкнулся с проблемой обработки ошибок, когда ошибка произошла сразу после первого сообщения: вызов await SendAsync просто переключает мое приложение в бесконечное ожидание.

Я упростил свой случай до примера консольного приложения:

var data_buffer = new BufferBlock<int>(new DataflowBlockOptions
{
    BoundedCapacity = 1
});

var process_block = new ActionBlock<int>(x =>
{
    throw new InvalidOperationException();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 2,
    BoundedCapacity = 1
});

data_buffer.LinkTo(process_block,
    new DataflowLinkOptions { PropagateCompletion = true });

for (var k = 1; k <= 5; k++)
{
    await data_buffer.SendAsync(k);
    Console.WriteLine("Send: {0}", k);
}

data_buffer.Complete();

await process_block.Completion;

person Michael Logutov    schedule 06.02.2014    source источник
comment
В качестве примечания, настройка блока с BoundedCapacity меньшим, чем MaxDegreeOfParallelism, снизит степень параллелизма до значения емкости. Другими словами, блок не может обрабатывать 2 элемента одновременно, если ему разрешено буферизовать только один. Я считаю, что это происходит потому, что после обработки двух элементов он должен сохранить два результата в своем выходном буфере, и у него нет доступного места для двух результатов.   -  person Theodor Zoulias    schedule 08.06.2020
comment
Может быть, да. Но это не интуитивно, по крайней мере, для меня. Я думал, что буфер означает все, что переполняется. Итак, если у нас есть 2 рабочих и 1 буферная емкость, он получает 2 элемента и передает их каждому рабочему и получает еще 1 элемент впереди.   -  person Michael Logutov    schedule 10.06.2020
comment
Что касается ActionBlock, то да, это имело бы смысл, потому что у этого блока есть только входная очередь без выхода. Но на самом деле даже ActionBlocks по какой-то причине подчиняются одному и тому же правилу. Наверное, для последовательности.   -  person Theodor Zoulias    schedule 10.06.2020


Ответы (2)


Это ожидаемое поведение. Если есть неисправность «ниже по потоку», ошибка не распространяется «назад» вверх по сетке. Сетка ожидает, что вы обнаружите эту ошибку (например, через process_block.Completion) и устраните ее.

Если вы хотите распространять ошибки в обратном направлении, у вас может быть await или продолжение на process_block.Completion, которое выдает ошибку вышестоящего блока (ов), если вышестоящий блок (и) дает сбой.

Обратите внимание, что это не единственное возможное решение; вы можете захотеть перестроить эту часть меша или связать источники с альтернативной целью. Исходные блоки не повреждены, поэтому они могут просто продолжить обработку с восстановленной сеткой.

person Stephen Cleary    schedule 06.02.2014
comment
Итак, как мне ждать, пока BufferBlock снова станет доступным для отправки данных? А что мне ждать по окончании очереди данных? - person Michael Logutov; 06.02.2014
comment
Я не уверен, что понимаю ваш вопрос. Когда ActionBlock отказывает, BufferBlock все еще работает; ему просто некуда отправлять свои данные. Если вы подключите отдельный ActionBlock к BufferBlock при выходе из строя первого, то BufferBlock просто продолжит выполнение. Или вы можете просто использовать _6 _ / _ 7_ в своем ActionBlock делегате. - person Stephen Cleary; 06.02.2014
comment
Имеет смысл. Я хотел, чтобы при выходе из строя любого из моих блоков весь конвейер останавливался, а исключение перебрасывалось в основной поток. - person Michael Logutov; 07.02.2014
comment
@MichaelLogutov: Вам придется сделать эту ссылку самостоятельно; то есть добавить продолжение к ActionBlock.Completion, которое вызывает ошибку BufferBlock. - person Stephen Cleary; 07.02.2014
comment
Понятно. Я хочу, чтобы PropagateCompletion тоже распространял неудачу. - person Michael Logutov; 07.02.2014
comment
@MichaelLogutov: Это так. Он будет распространять любые ошибки с BufferBlock на ActionBlock. Но ни данные, ни завершение / ошибки не передаются назад по ссылке. - person Stephen Cleary; 07.02.2014
comment
@stephen, не могли бы вы подробнее рассказать, как можно «отремонтировать» сетку? Я понял, что если блок поврежден, его невозможно восстановить, и лучше всего позволить всей сетке умереть и перезапуститься. - person pnewhook; 12.06.2014
comment
@pnewhook: Я предпочитаю разрушать всю сетку из-за ошибки (и я полагаю, что большинство людей так и поступило бы), но мы не должны этого делать. Если в блоке не распространяется завершение и возникает сбой, то вы можете отсоединить его от других блоков и вставить заменяющий блок (пока остальная часть сетки все еще работает). - person Stephen Cleary; 12.06.2014
comment
Помните, что если вы ожидаете не только process_block, но и блок data_buffer, вы можете закончить с тупиковой ситуацией, поскольку блок data_buffer будет выполняться до завершения только после того, как он обработает существующие элементы. И из-за ограниченной емкости process_block он может никогда не завершиться. - person stil; 09.03.2020

К сожалению, нет встроенного способа распространить завершение в обратном направлении, просто настроив блоки. Это нужно делать вручную. Один из подходов состоит в том, чтобы установить канал обратного распространения для каждого канала прямого распространения. Это быстро и легко, когда у вас есть небольшой конвейер, состоящий из 2-3 блоков, но он становится более громоздким и подверженным ошибкам по мере увеличения длины конвейера:

data_buffer.LinkTo(process_block,
    new DataflowLinkOptions { PropagateCompletion = true });
PropagateFailure(process_block, data_buffer); // Propagate backwards

public static async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
{
    try { await block1.Completion.ConfigureAwait(false); } catch { }
    if (block1.Completion.IsFaulted) block2.Fault(block1.Completion.Exception);
}

Та же идея с более интегрированным API:

public static async void BidirectionalLinkTo<T>(this ISourceBlock<T> source,
    ITargetBlock<T> target)
{
    source.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true });
    try { await target.Completion.ConfigureAwait(false); } catch { }
    if (target.Completion.IsFaulted) source.Fault(target.Completion.Exception);
}

data_buffer.BidirectionalLinkTo(process_block);

Другой подход - гарантировать, что весь конвейер будет отменен в случае сбоя любого блока. Это можно сделать, настроив все блоки с CancellationToken из одного источника и прикрепив обработчик к завершению каждого блока, который отменяет источник:

var cts = new CancellationTokenSource();

var data_buffer = new BufferBlock<int>(new DataflowBlockOptions
{
    BoundedCapacity = 1,
    CancellationToken = cts.Token
});
//...more blocks configured with the same cts.Token

OnErrorCancel(data_buffer, cts);
OnErrorCancel(process_block, cts);
//...

async void OnErrorCancel(IDataflowBlock block, CancellationTokenSource cts)
{
    try { await block.Completion.ConfigureAwait(false); } catch { }
    if (block.Completion.IsFaulted) cts.Cancel();
}

Что делает это решение менее привлекательным, так это то, что создание _5 _ создает также обязательство Dispose это сделать, что не всегда тривиально.

person Theodor Zoulias    schedule 08.06.2020