Как создать бесконечную сетку DataFlow с обработкой исключений?

Я создаю процессор задач, который использует TPL DataFlow. Я буду следовать модели производителя и потребителя, в которой производитель производит некоторые товары, которые время от времени обрабатываются, а потребители продолжают ждать прибытия новых товаров. Вот мой код:

async Task Main()
{
    var runner = new Runner();
    CancellationTokenSource cts = new CancellationTokenSource();
    Task runnerTask = runner.ExecuteAsync(cts.Token);

    await Task.WhenAll(runnerTask);
}

public class Runner
{
    public async Task ExecuteAsync(CancellationToken cancellationToken) {
        var random = new Random();

        ActionMeshProcessor processor = new ActionMeshProcessor();
        await processor.Init(cancellationToken);

        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more

            int[] items = GetItems(random.Next(3, 7));

            await processor.ProcessBlockAsync(items);
        }
    }

    private int[] GetItems(int count)
    {
        Random randNum = new Random();

        int[] arr = new int[count];
        for (int i = 0; i < count; i++)
        {
            arr[i] = randNum.Next(10, 20);
        }

        return arr;
    }
}

public class ActionMeshProcessor
{
    private TransformBlock<int, int> Transformer { get; set; }
    private ActionBlock<int> CompletionAnnouncer { get; set; }

    public async Task Init(CancellationToken cancellationToken)
    {
        var options = new ExecutionDataflowBlockOptions
        {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = 5,
            BoundedCapacity = 5
        };


        this.Transformer = new TransformBlock<int, int>(async input => {

            await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!

            if (input > 15)
            {
                throw new Exception($"I can't handle this number: {input}");
            }

            return input + 1;
        }, options);

        this.CompletionAnnouncer = new ActionBlock<int>(async input =>
        {
            Console.WriteLine($"Completed: {input}");

            await Task.FromResult(0);
        }, options);

        this.Transformer.LinkTo(this.CompletionAnnouncer);

        await Task.FromResult(0); // what do I await here?
    }

    public async Task ProcessBlockAsync(int[] arr)
    {
        foreach (var item in arr)
        {
            await this.Transformer.SendAsync(item); // await if there are no free slots
        }       
    }
}

Я добавил проверку условия выше, чтобы создать исключение, имитирующее исключительный случай.

Вот мои вопросы:

  • Как лучше всего обрабатывать исключения в указанной выше сетке, не разрушая всю сетку?

  • Есть ли лучший способ инициализировать / запустить / продолжить бесконечную сетку DataFlow?

  • Где мне ждать завершения?

Я просмотрел этот аналогичный вопрос


person Amit    schedule 24.04.2017    source источник


Ответы (1)


Исключения

В вашем init нет ничего асинхронного, это может быть стандартный синхронный конструктор. Вы можете обрабатывать исключения в своей сетке, не снимая ее, с помощью простой попытки поймать в lamda, которую вы предоставляете блоку. Затем вы можете обработать этот случай, либо отфильтровав результат вашей сетки, либо проигнорировав результат в следующих блоках. Ниже приведен пример фильтрации. В простом случае int вы можете использовать int? и отфильтровать любое значение, которое было null, или, конечно, вы можете установить любое значение магического индикатора, если хотите. Если вы действительно передаете ссылочный тип, вы можете либо вытолкнуть null, либо пометить элемент данных как грязный таким образом, чтобы это можно было проверить с помощью предиката в вашей ссылке.

public class ActionMeshProcessor {
    private TransformBlock<int, int?> Transformer { get; set; }
    private ActionBlock<int?> CompletionAnnouncer { get; set; }

    public ActionMeshProcessor(CancellationToken cancellationToken) {
        var options = new ExecutionDataflowBlockOptions {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = 5,
            BoundedCapacity = 5
        };


        this.Transformer = new TransformBlock<int, int?>(async input => {
            try {
                await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!

                if (input > 15) {
                    throw new Exception($"I can't handle this number: {input}");
                }

                return input + 1;
            } catch (Exception ex) {
                return null;
            }

        }, options);

        this.CompletionAnnouncer = new ActionBlock<int?>(async input =>
        {
            if (input == null) throw new ArgumentNullException("input");

            Console.WriteLine($"Completed: {input}");

            await Task.FromResult(0);
        }, options);

        //Filtering
        this.Transformer.LinkTo(this.CompletionAnnouncer, x => x != null);
        this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>());
    }

    public async Task ProcessBlockAsync(int[] arr) {
        foreach (var item in arr) {
            await this.Transformer.SendAsync(item); // await if there are no free slots
        }
    }
}

Завершение

Вы можете выставить Complete() и Completion из своего процессора и использовать их для await завершения при завершении работы приложения, предполагая, что это единственный раз, когда вы завершаете работу меша. Кроме того, убедитесь, что вы правильно распространяете завершение по ссылкам.

    //Filtering
    this.Transformer.LinkTo(this.CompletionAnnouncer, new DataflowLinkOptions() { PropagateCompletion = true }, x => x != null);
    this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>());
}        

public void Complete() {
    Transformer.Complete();
}

public Task Completion {
    get { return CompletionAnnouncer.Completion; }
}

Затем, исходя из вашего образца, наиболее вероятное место для завершения находится за пределами цикла, управляющего вашей обработкой:

public async Task ExecuteAsync(CancellationToken cancellationToken) {
    var random = new Random();

    ActionMeshProcessor processor = new ActionMeshProcessor();
    await processor.Init(cancellationToken);

    while (!cancellationToken.IsCancellationRequested) {
        await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more

        int[] items = GetItems(random.Next(3, 7));

        await processor.ProcessBlockAsync(items);
    }
    //asuming you don't intend to throw from cancellation
    processor.Complete();
    await processor.Completion();

}
person JSteward    schedule 24.04.2017
comment
это выглядит хорошо, Спасибо @JSteward, я буду отлаживать и посмотрю, попаду ли я в какие-либо другие крайние случаи и обновлюсь! - person Amit; 25.04.2017