Конвейеры обработки данных и обработка временных сбоев, кажется, идут рука об руку, поэтому мне интересно посмотреть, смогу ли я получить 2 лучшие библиотеки для них — TPL Dataflow и Polly, соответственно - для приятной игры вместе.
В качестве отправной точки я хотел бы применить политику обработки ошибок к ActionBlock
. В идеале я хотел бы инкапсулировать его в методе создания блоков с такой подписью:
ITargetBlock<T> CreatePollyBlock<T>(
Action<T> act, ExecutionDataflowBlockOptions opts, Polly.Policy policy)
Было бы достаточно просто policy.Execute
выполнить действие внутри ActionBlock
, но у меня есть два требования:
- В случае повторной попытки я не хочу, чтобы повторная попытка элемента имела приоритет над другими элементами в очереди. Другими словами, когда вы терпите неудачу, вы идете в конец очереди.
- Что еще более важно, если перед повторной попыткой есть период ожидания, я не хочу, чтобы это блокировало получение новых элементов. И если установлено
ExecutionDataflowBlockOptions.MaxDegreeOfParallelism
, я не хочу, чтобы элементы, ожидающие повторной попытки, «учитывались» в этом макс.
Чтобы удовлетворить эти требования, я полагаю, мне нужен «внутренний» ActionBlock
с примененным пользователем ExecutionDataflowBlockOptions
и некоторый «внешний» блок, который отправляет элементы во внутренний блок и применяет любую логику ожидания и повторной попытки (или что-то другое, что диктует политика). ) вне контекста внутреннего блока. Вот моя первая попытка:
// wrapper that provides a data item with mechanism to await completion
public class WorkItem<T>
{
private readonly TaskCompletionSource<byte> _tcs = new TaskCompletionSource<byte>();
public T Data { get; set; }
public Task Completion => _tcs.Task;
public void SetCompleted() => _tcs.SetResult(0);
public void SetFailed(Exception ex) => _tcs.SetException(ex);
}
ITargetBlock<T> CreatePollyBlock<T>(Action<T> act, Policy policy, ExecutionDataflowBlockOptions opts) {
// create a block that marks WorkItems completed, and allows
// items to fault without faulting the entire block.
var innerBlock = new ActionBlock<WorkItem<T>>(wi => {
try {
act(wi.Data);
wi.SetCompleted();
}
catch (Exception ex) {
wi.SetFailed(ex);
}
}, opts);
return new ActionBlock<T>(async x => {
await policy.ExecuteAsync(async () => {
var workItem = new WorkItem<T> { Data = x };
await innerBlock.SendAsync(workItem);
await workItem.Completion;
});
});
}
Чтобы проверить это, я создал блок с политикой ожидания и повторения и фиктивный метод, который выдает исключение при первых трех вызовах (для всего приложения). Затем я передал ему некоторые данные:
"a", "b", "c", "d", "e", "f"
Я ожидаю, что a, b и c потерпят неудачу и окажутся в конце очереди. Но я заметил, что они поражают действие внутреннего блока в следующем порядке:
"a", "a", "a", "a", "b", "c", "d", "e", "f"
По сути, я не смог выполнить свои собственные требования, и довольно легко понять, почему: внешний блок не пропускает новые элементы, пока не будут выполнены все повторные попытки текущего элемента. Простое, но, казалось бы, хакерское решение — добавить большое значение MaxDegreeOfParallelism
во внешний блок:
return new ActionBlock<T>(async x => {
await policy.ExecuteAsync(async () => {
var workItem = new WorkItem<T> { Data = x };
await innerBlock.SendAsync(workItem);
await workItem.Completion;
});
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
С этим изменением я заметил, что новые предметы действительно попадают до повторных попыток, но я также вызвал некоторый хаос. Вход во внутренний блок стал случайным, хотя первые 3 элемента всегда в конце:
"a", "e", "b", "c", "d", "a", "e", "b"
Так что это немного лучше. Но в идеале я хотел бы сохранить порядок:
"a", "b", "c", "d", "e", "a", "b", "c"
Здесь я застрял, и, рассуждая об этом, мне интересно, возможно ли это вообще при этих ограничениях, в частности, что внутренние компоненты CreatePollyBlock
могут выполнять политику, но не могут определять это. Если бы эти внутренние компоненты могли, например, предоставить лямбду повторных попыток, я думаю, это дало бы мне гораздо больше возможностей. Но это часть определения политики, и в соответствии с этим дизайном я не могу этого сделать.
Заранее благодарю за любую помощь.
DataflowBlockOptions.TaskScheduler
, чтобы указатьQueuedTaskScheduler
из ParallelExtensionsExtras в качестве планировщика для внутренних блоков. Затем используйте функцию приоритетной очередиQueuedTaskScheduler
для создания приоритетных очередей в соответствии с номером попытки. Поместите первую попытку в очередьpriority:0
, первую повторную попытку в очередьpriority:1
, вторую повторную попытку вpriority:2
и т. д. Это поместит все первые повторные попытки после первоначальных попыток, все вторые повторные попытки после 1-х повторных попыток и т. д. Это довольно сложное решение, и я было бы интересно услышать что-нибудь попроще. - person mountain traveller   schedule 06.09.2018Envelope<T>
, который содержит поля состояния и повторных попыток.Link
может иметь предикат, который можно использовать для направления сообщений в следующий блок, блок сброса или исходный блок для повторной попытки. - person Panagiotis Kanavos   schedule 07.09.2018WorkItem
), не позволяет распространяться исключениям (захвачено в задачеWorkItem.Completion
) и повторно публикует в случае сбоя. Эта попытка решения была вдохновлена, по крайней мере частично, двумя ответами на вопрос, на который вы ссылались, поэтому я думаю, что мы на одной странице. :) - person Todd Menier   schedule 07.09.2018CreatePollyBlock
, которую я пытаюсь реализовать (в частности, полностью определенная политика передается из внешнего мира), и 2 пункта о том, чтобы пропускать новые элементы перед повторными попытками и ожидающие элементы не учитываются. против максимального параллелизма блока. Именно из-за этих ограничений я ввел 2 блока, и они не применяются к ответу Свика. - person Todd Menier   schedule 07.09.2018Task.Delay(...)
для ожидания: это зависит от того, эффективно лиTask.Delay(...)
освобождает слот параллелизма с точки зрения TPL. - person mountain traveller   schedule 07.09.2018act
по сравнению с временным разделением входных данных могут повлиять на результат в реальном мире. Также важно знать, является ли испытательный стенд репрезентативным в этом отношении. - person mountain traveller   schedule 07.09.2018mustPreserveOrder
. Если false, используйте мою текущую реализацию. Если это правда, все намного проще - элемент B не может запуститься, пока элемент A не выполнит все свои повторные попытки и не ожидает, поэтому все это можно просто инкапсулировать в 1 блок. false = гарантия потери заказа в обмен на намного большую пропускную способность. - person Todd Menier   schedule 07.09.2018