Использование Polly с потоком данных TPL

Конвейеры обработки данных и обработка временных сбоев, кажется, идут рука об руку, поэтому мне интересно посмотреть, смогу ли я получить 2 лучшие библиотеки для них — TPL Dataflow и Polly, соответственно - для приятной игры вместе.

В качестве отправной точки я хотел бы применить политику обработки ошибок к ActionBlock. В идеале я хотел бы инкапсулировать его в методе создания блоков с такой подписью:

ITargetBlock<T> CreatePollyBlock<T>(
    Action<T> act, ExecutionDataflowBlockOptions opts, Polly.Policy policy)

Было бы достаточно просто policy.Execute выполнить действие внутри ActionBlock, но у меня есть два требования:

  1. В случае повторной попытки я не хочу, чтобы повторная попытка элемента имела приоритет над другими элементами в очереди. Другими словами, когда вы терпите неудачу, вы идете в конец очереди.
  2. Что еще более важно, если перед повторной попыткой есть период ожидания, я не хочу, чтобы это блокировало получение новых элементов. И если установлено ExecutionDataflowBlockOptions.MaxDegreeOfParallelism, я не хочу, чтобы элементы, ожидающие повторной попытки, «учитывались» в этом макс.

Чтобы удовлетворить эти требования, я полагаю, мне нужен «внутренний» ActionBlock с примененным пользователем ExecutionDataflowBlockOptions и некоторый «внешний» блок, который отправляет элементы во внутренний блок и применяет любую логику ожидания и повторной попытки (или что-то другое, что диктует политика). ) вне контекста внутреннего блока. Вот моя первая попытка:

// wrapper that provides a data item with mechanism to await completion
public class WorkItem<T>
{
    private readonly TaskCompletionSource<byte> _tcs = new TaskCompletionSource<byte>();

    public T Data { get; set; }
    public Task Completion => _tcs.Task;

    public void SetCompleted() => _tcs.SetResult(0);
    public void SetFailed(Exception ex) => _tcs.SetException(ex);
}

ITargetBlock<T> CreatePollyBlock<T>(Action<T> act, Policy policy, ExecutionDataflowBlockOptions opts) {
    // create a block that marks WorkItems completed, and allows
    // items to fault without faulting the entire block.
    var innerBlock = new ActionBlock<WorkItem<T>>(wi => {
        try {
            act(wi.Data);
            wi.SetCompleted();
        }
        catch (Exception ex) {
            wi.SetFailed(ex);
        }
    }, opts);

    return new ActionBlock<T>(async x => {
        await policy.ExecuteAsync(async () => {
            var workItem = new WorkItem<T> { Data = x };
            await innerBlock.SendAsync(workItem);
            await workItem.Completion;
        });
    });
}

Чтобы проверить это, я создал блок с политикой ожидания и повторения и фиктивный метод, который выдает исключение при первых трех вызовах (для всего приложения). Затем я передал ему некоторые данные:

"a", "b", "c", "d", "e", "f"

Я ожидаю, что a, b и c потерпят неудачу и окажутся в конце очереди. Но я заметил, что они поражают действие внутреннего блока в следующем порядке:

"a", "a", "a", "a", "b", "c", "d", "e", "f"

По сути, я не смог выполнить свои собственные требования, и довольно легко понять, почему: внешний блок не пропускает новые элементы, пока не будут выполнены все повторные попытки текущего элемента. Простое, но, казалось бы, хакерское решение — добавить большое значение MaxDegreeOfParallelism во внешний блок:

return new ActionBlock<T>(async x => {
    await policy.ExecuteAsync(async () => {
        var workItem = new WorkItem<T> { Data = x };
        await innerBlock.SendAsync(workItem);
        await workItem.Completion;
    });
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });

С этим изменением я заметил, что новые предметы действительно попадают до повторных попыток, но я также вызвал некоторый хаос. Вход во внутренний блок стал случайным, хотя первые 3 элемента всегда в конце:

"a", "e", "b", "c", "d", "a", "e", "b"

Так что это немного лучше. Но в идеале я хотел бы сохранить порядок:

"a", "b", "c", "d", "e", "a", "b", "c"

Здесь я застрял, и, рассуждая об этом, мне интересно, возможно ли это вообще при этих ограничениях, в частности, что внутренние компоненты CreatePollyBlock могут выполнять политику, но не могут определять это. Если бы эти внутренние компоненты могли, например, предоставить лямбду повторных попыток, я думаю, это дало бы мне гораздо больше возможностей. Но это часть определения политики, и в соответствии с этим дизайном я не могу этого сделать.

Заранее благодарю за любую помощь.


person Todd Menier    schedule 06.09.2018    source источник
comment
Мне интересно, можете ли вы использовать DataflowBlockOptions.TaskScheduler, чтобы указать QueuedTaskScheduler из ParallelExtensionsExtras в качестве планировщика для внутренних блоков. Затем используйте функцию приоритетной очереди QueuedTaskScheduler для создания приоритетных очередей в соответствии с номером попытки. Поместите первую попытку в очередь priority:0, первую повторную попытку в очередь priority:1, вторую повторную попытку в priority:2 и т. д. Это поместит все первые повторные попытки после первоначальных попыток, все вторые повторные попытки после 1-х повторных попыток и т. д. Это довольно сложное решение, и я было бы интересно услышать что-нибудь попроще.   -  person mountain traveller    schedule 06.09.2018
comment
Каким бы ни было решение для сохранения порядка обработки блоков при начальной попытке/приоритете, если время до отказа операции является случайным, существует ли какой-нибудь простой способ с помощью этой схемы сохранить порядок при повторных попытках? Если a отказывает медленно, а e — быстро, повторная попытка e, скорее всего, будет предшествовать повторной попытке a.   -  person mountain traveller    schedule 06.09.2018
comment
@mountaintraveller Спасибо (и отличная библиотека). Хороший вопрос о порядке повторных попыток - я хочу, чтобы они были повторно поставлены в очередь сразу после неудачи, поэтому повторные попытки не обязательно будут соответствовать порядку, в котором они были предприняты впервые. Я действительно думаю, что разумно ожидать, что первоначальные попытки будут происходить в том порядке, в котором они опубликованы. Я почитаю о QTS, но не могу не думать, что здесь есть решение, которое просто включает в себя составление блоков определенным образом.   -  person Todd Menier    schedule 06.09.2018
comment
@ToddMenier Потоки данных аналогичны очередям и обработке сообщений. Повторная попытка означает повторную отправку сообщения во входную очередь. Повторы и другие данные маршрутизации хранятся в конверте сообщения. Блоки потока данных уже имеют очереди ввода/вывода. Вместо отправки необработанных сообщений используйте класс Envelope<T>, который содержит поля состояния и повторных попыток. Link может иметь предикат, который можно использовать для направления сообщений в следующий блок, блок сброса или исходный блок для повторной попытки.   -  person Panagiotis Kanavos    schedule 07.09.2018
comment
@ToddMenier, или вы можете использовать Polly и сделать репост действия восстановления в исходный блок после уменьшения счетчика повторных попыток. В любом случае, не допускайте распространения исключений, инкапсулируйте их в конверты ошибок.   -  person Panagiotis Kanavos    schedule 07.09.2018
comment
@ToddMenier ответы на этот вопрос описывают оба варианта. Логика повтора в ответе svick может быть заменена Polly, если функция восстановления отправляет сообщение в исходный блок. Стивен Клири объясняет второй вариант, ссылаясь на исходный блок.   -  person Panagiotis Kanavos    schedule 07.09.2018
comment
@PanagiotisKanavos Может быть, мне нужно больше комментариев в моем коде, потому что он в основном делает все, что вы описываете, — использует конверт (я называю его WorkItem), не позволяет распространяться исключениям (захвачено в задаче WorkItem.Completion) и повторно публикует в случае сбоя. Эта попытка решения была вдохновлена, по крайней мере частично, двумя ответами на вопрос, на который вы ссылались, поэтому я думаю, что мы на одной странице. :)   -  person Todd Menier    schedule 07.09.2018
comment
@ToddMenier, вам нужно удалить код. WorkItem не требует ничего, кроме данных и счетчика повторных попыток, поэтому при следующей обработке рабочего элемента рабочая функция может проверить, следует ли повторить попытку или нет. Обработка потока данных аналогична обработке сообщений. В задачи message не входит обработка маршрутизации или повторных попыток.   -  person Panagiotis Kanavos    schedule 07.09.2018
comment
@ToddMenier политика также должна вызываться внутри блока. Он должен перехватывать исключение и размещать его в исходном блоке. Вам не нужны два блока. Если вы проверите код svick, он сначала определяет переменную блока, а затем создает экземпляр блока, что позволяет ему ссылаться на блок изнутри рабочего делегата.   -  person Panagiotis Kanavos    schedule 07.09.2018
comment
@PanagiotisKanavos Спасибо за вашу помощь. Похоже, вы думаете, что мне нужно вернуться к чертежной доске, и я бы очень хотел получить ответ, если вы думаете, что сможете решить эту проблему. Просто имейте в виду мои ограничения: подпись метода CreatePollyBlock, которую я пытаюсь реализовать (в частности, полностью определенная политика передается из внешнего мира), и 2 пункта о том, чтобы пропускать новые элементы перед повторными попытками и ожидающие элементы не учитываются. против максимального параллелизма блока. Именно из-за этих ограничений я ввел 2 блока, и они не применяются к ответу Свика.   -  person Todd Menier    schedule 07.09.2018
comment
Спасибо @PanagiotisKanavos за очень точные ссылки на ответы Свика и Стивена Клири. Если @ToddMenier использует ограниченный параллелизм в обработке, может потребоваться использование двух отдельных блоков, так как только с одним блоком, работающим в условиях ограниченного параллелизма, может возникнуть риск того, что часть параллелизма будет занята «ожиданием». фаза WaitAndRetry, которая расточительна для параллелизма. Политика Полли WaitAndRetryAsync внутри использует Task.Delay(...) для ожидания: это зависит от того, эффективно ли Task.Delay(...) освобождает слот параллелизма с точки зрения TPL.   -  person mountain traveller    schedule 07.09.2018
comment
@ToddMenier В идеале я вижу, что порядок сохранен, возможно, это похоже на некоторое концептуальное противоречие между целями. Параллельная обработка defn открывает возможность для изменения порядка завершения в зависимости от условий гонки? Можно было бы структурировать код так, чтобы порядок вывода был «достаточно» точным для порядка ввода для некоторой заданной производственной среды. В конкретном случае кажется, что, возможно, в игре участвуют расы, стоящие в очереди (внешние на внутренние), и относительные трудности этого по сравнению с act по сравнению с временным разделением входных данных могут повлиять на результат в реальном мире. Также важно знать, является ли испытательный стенд репрезентативным в этом отношении.   -  person mountain traveller    schedule 07.09.2018
comment
@mountaintraveller Вы делаете хорошее замечание. С одной стороны, если максимальный параллелизм равен 1 (по умолчанию), ожидается, что элементы будут обрабатываться последовательно в указанном порядке. Но, с другой стороны, я говорю, что повторные попытки идут в конце очереди, что по определению означает, что элементы могут выполняться в любом порядке. По-прежнему кажется более правильным отправлять первые попытки во внутренний блок по порядку, но с практической точки зрения, возможно, то, что у меня есть, достаточно хорошо.   -  person Todd Menier    schedule 07.09.2018
comment
Я мог бы улучшить сигнатуру моего метода с помощью бита mustPreserveOrder. Если false, используйте мою текущую реализацию. Если это правда, все намного проще - элемент B не может запуститься, пока элемент A не выполнит все свои повторные попытки и не ожидает, поэтому все это можно просто инкапсулировать в 1 блок. false = гарантия потери заказа в обмен на намного большую пропускную способность.   -  person Todd Menier    schedule 07.09.2018