Элементы TransformBlock застревают в очереди вывода. Почему и как исправить?

Я прошел через поток данных TPL и столкнулся с очень раздражающей проблемой, возникшей в коде, использующем TrasformBlock, связанный с ActionBlock.

В конце концов я обнаружил, что элементы застряли в выходной очереди TransformBlock, так как его свойство OutputCount постоянно возвращало значение выше "0". Вот почему все приложение заблокировано. Однако он разблокируется, как только я позвоню TransformBlock.TryReceiveAll().

Может кто-нибудь, пожалуйста, дайте мне знать, если я что-то пропустил или как предотвратить такое поведение?

static void Main()
{
    int total = 0;
    int itemsProcessing = 0;

    TransformBlock<int, Tuple<int, double>> transformBlock = new TransformBlock<int, Tuple<int, double>>(
        i => new Tuple<int, double>(i, Math.Sqrt(i)),
        new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = 20,
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    ActionBlock<Tuple<int, double>> outputBlock = new ActionBlock<Tuple<int, double>>(async tuple =>
        {
            await Task.Delay(1000); // simulating data output delay
            Interlocked.Decrement(ref itemsProcessing);
        },
        new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = 5,
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    transformBlock.Completion.ContinueWith(t => outputBlock.Complete());

    using (Timer timer = new Timer(o =>
        {
            Console.Title = string.Format(
                "{0}: {1}/{2} {3}/{4}/{5}",
                Assembly.GetExecutingAssembly().GetName().Name,
                Volatile.Read(ref itemsProcessing), Volatile.Read(ref total),
                transformBlock.InputCount, transformBlock.OutputCount, outputBlock.InputCount);
        }, null, 100, 100))
    {
        using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
        {
            for (int i = 0; i < 40; i++)
            {
                Thread.Sleep(100); // simulating new item retrieval delay

                Interlocked.Increment(ref total);
                Interlocked.Increment(ref itemsProcessing);

                transformBlock.SendAsync(i).Wait();
            }
        }

        Console.WriteLine("Enqueued");

        transformBlock.Complete();
        outputBlock.Completion.Wait();

        Console.WriteLine("Finish");

        timer.Change(Timeout.Infinite, Timeout.Infinite);
        timer.Dispose();
    }
}

person Rauf    schedule 30.09.2015    source источник


Ответы (1)


Вызов TransformBlock.LinkTo возвращает одноразовую регистрацию. Когда вы избавляетесь от этой регистрации, блоки разъединяются.

Ваша область using заканчивается слишком рано, и блоки разъединяются до того, как TransformBlock имеет шанс опустошить себя в ActionBlock, не позволяя ему завершиться. Поскольку первый блок не завершается, следующий даже не начинает завершаться, не говоря уже о завершении.

Перемещение этого ожидания внутрь блока using решает тупиковую ситуацию:

using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
    for (int i = 0; i < 40; i++)
    {
        Thread.Sleep(100); // simulating new item retrieval delay

        Interlocked.Increment(ref total);
        Interlocked.Increment(ref itemsProcessing);

        transformBlock.SendAsync(i).Wait();
    }

    Console.WriteLine("Enqueued");
    transformBlock.Complete();
    outputBlock.Completion.Wait();
    Console.WriteLine("Finish");
}

В качестве примечания, вам действительно не следует блокировать асинхронный код таким образом. Было бы намного проще использовать async-await вместо Wait(), Task.Delay вместо Thread.Sleep и т.д.

Кроме того, поскольку вы используете PropagateCompletion, вам не нужно явно вызывать outputBlock.Complete().

person i3arnon    schedule 30.09.2015
comment
i3arnon, спасибо большое, все заработало. Что касается примечания, это всего лишь тестовое приложение, конечно, в реальном мире я бы попытался использовать async/await вместо простой блокировки. Спасибо еще раз. - person Rauf; 01.10.2015