Как обработать полный список входных данных с помощью TPL Dataflow?

Я новичок в TPL Dataflow, и у меня он работает, но я не уверен, правильно ли я его использую. У меня есть список входных данных (строк), и я хочу обработать их (все) с максимальной степенью параллелизма и знать, когда все это будет завершено. Прямо сейчас я просто foreach просматриваю входы и вызываю Post на ActionBlock, игнорируя возвращаемое значение. Это кажется неправильным, так как входные данные могут быть пропущены.

У меня вопрос: как избежать пропуска предметов? Есть ли встроенный блок, в который я могу просто дать свои входные данные, и он будет следить за тем, чтобы все они были выполнены? (Независимо от успеха / неудачи на ввод.)

Предложения, которые я видел, в основном сводятся к:

await block.Completion;

Это учитывает неудачные входы (где Post или SendAsync вернет false)? Для меня странно то, что кажется, что это определение делается, когда я вызываю Post, а не после, поэтому этот Completion даже не будет включать эти элементы.

Я чувствую, что мне нужен в основном цикл повтора для входов, которые он не мог обработать в предыдущий раз, что-то вроде:

while (items.Count > 0) {
  foreach (var item in items) {
    if (await block.SendAsync(item)) {
      items.Remove(item);
    }
  }

  await block.Completion;
}

block.Complete();

(За исключением улучшенной обработки цикла / проверки ошибок.)

Этот дополнительный уровень не нужен? Или я где-то концептуально ошибаюсь?


person Josh    schedule 29.07.2019    source источник
comment


Ответы (2)


Это кажется неправильным, так как входные данные могут быть пропущены.

Предполагая, что вы используете значения по умолчанию, это правильно. Post возвращает false, только если блок отказывается от ввода. Это может произойти, если блок получил сигнал Complete или если входной буфер блока заполнен. По умолчанию входной буфер каждого блока может расти бесконечно, поэтому ActionBlock с размером входного буфера по умолчанию будет возвращать false из Post только после вызова Complete.

Чаще всего для ActionBlock используется неограниченная ограниченная емкость, и код вызывает Complete только после добавления всех элементов. В этом случае Post никогда не вернет false, и вы можете спокойно игнорировать возвращаемое значение.

person Stephen Cleary    schedule 29.07.2019

Метод Post вернет false, если блок завершен или если входной буфер блока заполнен. Поскольку параметр BoundedCapacity не является чем-то экзотическим и вполне может потребоваться на более поздней стадии проекта для решения возникающих проблем с высоким использованием ОЗУ, я не думаю, что использовать метод Post и просто игнорировать результат - это безопасная ставка. Чтобы защититься от несмешных ошибок, связанных с отсутствием сообщений (которые могут быть заказами или счетами), вы можете сделать что-то вроде этого:

foreach (var item in items)
{
    var accepted = block.Post(item);
    if (!accepted) throw new InvalidOperationException("Item was not accepted");
}

Таким образом вы, по крайней мере, будете уведомлены о том, что что-то сломано, и не допустите появления багов.

С другой стороны, ожидает _5 _ и игнорировать результат намного безопаснее. SendAsync обычно возвращает false в условиях, когда произошло исключение или отмена, и в этом случае вы получите уведомление в момент, когда вы await для Completion блока. Так что в этом случае не нужно создавать исключения.

foreach (var item in items)
{
    await block.SendAsync(item).ConfigureAwait(false);
}

По соображениям производительности вы можете использовать как Post, так и SendAsync. Это будет иметь значение только в том случае, если вам нужно обработать десятки миллионов элементов.

foreach (var item in items)
{
    if (!block.Post(item))
    {
        await block.SendAsync(item).ConfigureAwait(false);
    }
}
person Theodor Zoulias    schedule 14.08.2019