Применим ли TPL-Dataflow для высокопараллельных приложений?

Я изучаю, способен ли TPL-Dataflow избавить нас от написания шаблонного кода с блокировками и мониторами для наших высококонкурентных приложений.

Итак, я моделирую простой сценарий с одним производителем и несколькими потребителями, каждый из которых должен получать все произведенные сообщения. И если одни потребители работают медленнее других, это не должно вызывать стагнацию системы.

Вот код:

using NLog;
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp10
{
    internal sealed class Program
    {
        private static readonly Logger m_logger = LogManager.GetCurrentClassLogger();

        static void Main(string[] args)
        {
            BroadcastBlock<int> root = new BroadcastBlock<int>(d => d);

            ExecutionDataflowBlockOptions consumerOptions = new ExecutionDataflowBlockOptions() {
                BoundedCapacity = 3
            };

            for (int consumerIndex = 0; consumerIndex < 5; ++consumerIndex)
            {
                int c = consumerIndex;
                ActionBlock<int> consumer = new ActionBlock<int>(
                    (int d) => {
                        m_logger.Trace($"[#{c}] Starting consuming {d}");
                        Thread.Sleep(c * 100);
                        m_logger.Trace($"[#{c}] Ended consuming {d}");
                    },
                    consumerOptions
                );
                root.LinkTo(consumer);
            }

            Producer(10, root);

            Console.ReadLine();
        }


        private static void Producer(int n, ITargetBlock<int> target)
        {
            for (int i = 0; i < n; ++i)
            {
                m_logger.Trace($"Starting producing {i}");
                if (!target.Post(i))
                {
                    throw new Exception($"Failed to post message #{i}");
                }
                m_logger.Trace($"Ending producing {i}");
                Thread.Sleep(50);
            }
        }
    }
}

Как видите, я ограничиваю размер буферов в потребителях до 3 (чтобы предотвратить бесконечный рост буфера для медленных потребителей).

Каждый следующий потребитель работает медленнее предыдущего. Потребитель №0 — самый быстрый без задержек. И производитель имеет небольшую задержку в производстве.

Я ожидаю, что по крайней мере потребитель № 0 будет потреблять все сообщения, а потребитель № 4 не получит некоторые сообщения, потому что его буфер переполнится.

Вот результаты:

2021-04-15 22:44:15.4905 [T1] Starting producing 0 
2021-04-15 22:44:15.5049 [T1] Ending producing 0 
2021-04-15 22:44:15.5166 [T4] [#4] Starting consuming 0 
2021-04-15 22:44:15.5285 [T7] [#0] Starting consuming 0 
2021-04-15 22:44:15.5285 [T7] [#0] Ended consuming 0 
2021-04-15 22:44:15.5285 [T7] [#1] Starting consuming 0 
2021-04-15 22:44:15.5573 [T1] Starting producing 1 
2021-04-15 22:44:15.5573 [T1] Ending producing 1 
2021-04-15 22:44:15.5573 [T5] [#0] Starting consuming 1 
2021-04-15 22:44:15.5573 [T5] [#0] Ended consuming 1 
2021-04-15 22:44:15.5573 [T5] [#2] Starting consuming 0 
2021-04-15 22:44:15.5573 [T6] [#3] Starting consuming 0 
2021-04-15 22:44:15.6081 [T1] Starting producing 2 
2021-04-15 22:44:15.6081 [T1] Ending producing 2 
2021-04-15 22:44:15.6352 [T7] [#1] Ended consuming 0 
2021-04-15 22:44:15.6352 [T7] [#1] Starting consuming 1 
2021-04-15 22:44:15.6592 [T1] Starting producing 3 
2021-04-15 22:44:15.6592 [T1] Ending producing 3 
2021-04-15 22:44:15.7102 [T1] Starting producing 4 
2021-04-15 22:44:15.7102 [T1] Ending producing 4 
2021-04-15 22:44:15.7353 [T7] [#1] Ended consuming 1 
2021-04-15 22:44:15.7353 [T7] [#1] Starting consuming 2 
2021-04-15 22:44:15.7612 [T5] [#2] Ended consuming 0 
2021-04-15 22:44:15.7612 [T5] [#2] Starting consuming 1 
2021-04-15 22:44:15.7612 [T1] Starting producing 5 
2021-04-15 22:44:15.7612 [T1] Ending producing 5 
2021-04-15 22:44:15.8132 [T1] Starting producing 6 
2021-04-15 22:44:15.8132 [T1] Ending producing 6 
2021-04-15 22:44:15.8420 [T7] [#1] Ended consuming 2 
2021-04-15 22:44:15.8420 [T7] [#1] Starting consuming 3 
2021-04-15 22:44:15.8603 [T6] [#3] Ended consuming 0 
2021-04-15 22:44:15.8603 [T6] [#3] Starting consuming 1 
2021-04-15 22:44:15.8764 [T1] Starting producing 7 
2021-04-15 22:44:15.8764 [T1] Ending producing 7 
2021-04-15 22:44:15.9174 [T4] [#4] Ended consuming 0 
2021-04-15 22:44:15.9174 [T4] [#4] Starting consuming 1 
2021-04-15 22:44:15.9369 [T1] Starting producing 8 
2021-04-15 22:44:15.9369 [T1] Ending producing 8 
2021-04-15 22:44:15.9509 [T7] [#1] Ended consuming 3 
2021-04-15 22:44:15.9509 [T7] [#1] Starting consuming 4 
2021-04-15 22:44:15.9639 [T5] [#2] Ended consuming 1 
2021-04-15 22:44:15.9639 [T5] [#2] Starting consuming 2 
2021-04-15 22:44:15.9874 [T1] Starting producing 9 
2021-04-15 22:44:15.9874 [T1] Ending producing 9 
2021-04-15 22:44:16.0515 [T7] [#1] Ended consuming 4 
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 2 
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 2 
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 3 
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 3 
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 4 
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 4 
2021-04-15 22:44:16.0515 [T7] [#1] Starting consuming 5 
2021-04-15 22:44:16.1525 [T7] [#1] Ended consuming 5 
2021-04-15 22:44:16.1525 [T7] [#1] Starting consuming 6 
2021-04-15 22:44:16.1525 [T6] [#3] Ended consuming 1 
2021-04-15 22:44:16.1525 [T6] [#3] Starting consuming 2 
2021-04-15 22:44:16.1645 [T5] [#2] Ended consuming 2 
2021-04-15 22:44:16.1645 [T5] [#2] Starting consuming 4 
2021-04-15 22:44:16.2526 [T7] [#1] Ended consuming 6 
2021-04-15 22:44:16.2526 [T7] [#1] Starting consuming 7 
2021-04-15 22:44:16.3177 [T4] [#4] Ended consuming 1 
2021-04-15 22:44:16.3177 [T4] [#4] Starting consuming 2 
2021-04-15 22:44:16.3537 [T7] [#1] Ended consuming 7 
2021-04-15 22:44:16.3537 [T7] [#1] Starting consuming 9 
2021-04-15 22:44:16.3537 [T5] [#2] Ended consuming 4 
2021-04-15 22:44:16.3537 [T5] [#2] Starting consuming 5 
2021-04-15 22:44:16.4547 [T7] [#1] Ended consuming 9 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 5 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 5 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 6 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 6 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 7 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 7 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 9 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 9 
2021-04-15 22:44:16.4607 [T6] [#3] Ended consuming 2 
2021-04-15 22:44:16.4607 [T6] [#3] Starting consuming 4 
2021-04-15 22:44:16.5648 [T5] [#2] Ended consuming 5 
2021-04-15 22:44:16.5648 [T5] [#2] Starting consuming 9 
2021-04-15 22:44:16.7179 [T4] [#4] Ended consuming 2 
2021-04-15 22:44:16.7179 [T4] [#4] Starting consuming 4 
2021-04-15 22:44:16.7610 [T6] [#3] Ended consuming 4 
2021-04-15 22:44:16.7610 [T6] [#3] Starting consuming 9 
2021-04-15 22:44:16.7610 [T5] [#2] Ended consuming 9 
2021-04-15 22:44:17.0611 [T6] [#3] Ended consuming 9 
2021-04-15 22:44:17.1182 [T4] [#4] Ended consuming 4 
2021-04-15 22:44:17.1182 [T4] [#4] Starting consuming 9 
2021-04-15 22:44:17.5185 [T4] [#4] Ended consuming 9 

Что меня озадачивает, так это то, что потребитель № 0 никогда не получает сообщение 8. И на самом деле ни один другой потребитель не получает это сообщение. Почему так? Это ожидаемое поведение для потока данных?

Если вы хотите проверить, мой NLog.config выглядит следующим образом (я использую цель AsyncWrapper, чтобы доступ к файлу не влиял на результаты моего эксперимента):

<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.nlog-project.org/schemas/NLog.xsd NLog.xsd"
        internalLogFile="nlog.log"
        internalLogLevel="Warn"
        throwExceptions="false" 
        parseMessageTemplates="false"
    >

  <variable name="varExceptionMsg" value="${exception:format=Message}"/>
  <variable name="varMessageWithException" value="${message}${onexception:inner= ${varExceptionMsg}}"/>
  <variable name="msg4File" value="${longdate} [T${threadid}${threadname}] ${varMessageWithException} ${onexception:inner=${newline}${exception:format=tostring:maxInnerExceptionLevel=2:innerFormat=tostring}}" />

  <targets>

    <target name="file" xsi:type="AsyncWrapper" queueLimit="5000" overflowAction="Discard">
      <target xsi:type="File"
              layout="${msg4File}"
              fileName="${basedir}/logs/${processname}.${shortdate}.log"
              keepFileOpen="true"
              encoding="utf-8"
            />
    </target>
  </targets>

  <rules>
    <logger name="*" minlevel="Trace" writeTo="file" />
  </rules>
</nlog>


person Mikhail    schedule 15.04.2021    source источник
comment
Не могли бы вы попробовать добавить эту строку в начале программы: ThreadPool.SetMinThreads(100, 100); и посмотреть, будет ли это иметь значение? Это предлагается не как исправление, а как способ устранения проблемы, которую вы наблюдаете.   -  person Theodor Zoulias    schedule 15.04.2021
comment
Выйдите из программы, нажав клавишу возврата, а затем войдите в файл журнала.   -  person jdweng    schedule 15.04.2021
comment
@TheodorZoulias кажется, что это помогает, да. Итак, вы полагаете, что источник проблемы в том, что я столкнулся с голоданием пула рабочих потоков?   -  person Mikhail    schedule 16.04.2021
comment
@jdweng, не уверен, что вы имеете в виду, но да, я выхожу с помощью возврата hittnig, и файл действительно очищается. Как вы можете видеть, журнал заполнен — по крайней мере, есть свидетельство того, что 0-й рабочий процесс потреблял 9-е сообщение, которое создается после 8-го, и никакое изменение порядка не могло произойти.   -  person Mikhail    schedule 16.04.2021
comment
Так разве проблема не в флеше? Оборачивая код блоком using, он автоматически сбрасывается.   -  person jdweng    schedule 16.04.2021
comment
Да, я думаю, насыщение пула потоков. Кстати, вопрос в заголовке намного шире, чем вопрос в теле, и поэтому я думаю, что ваш вопрос был отклонен. Каким бы ни было объяснение неиспользованных сообщений в этой конкретной конфигурации, вряд ли это ответ на вопрос о том, применима ли библиотека TPL-Dataflow для приложений с высокой степенью параллельности в целом!   -  person Theodor Zoulias    schedule 16.04.2021
comment
ИМХО, более интересным вопросом было бы, как реализовать пользовательский BroadcastBlock<T> с определенным поведением. Например, BroadcastBlock<T> гарантирует, что каждое полученное сообщение будет доставлено (предложено и принято) по крайней мере одной из связанных целей. Вы можете получить некоторые идеи о том, как создать такой блок в потоке данных TPL">здесь.   -  person Theodor Zoulias    schedule 16.04.2021
comment
@Mikhail DataFlow был создан для приложений с очень высоким уровнем параллелизма, в которых несколько блоков объединяются для формирования конвейера обработки. Это парадигма CSP, которую реализуют каналы Go. Это не просто класс pub/sub. Проблема в самом коде - те Thread.Sleep блокируют рабочие задачи, обрабатывающие сообщения. Там вообще нет трубопровода   -  person Panagiotis Kanavos    schedule 20.04.2021
comment
@Mikhail TPL DataFlow вырос из вдохновленной робототехникой среды выполнения для параллелизма и координации, который, в свою очередь, использует модель коммуникационных последовательных процессов вместо общей памяти, lock- на основе модели, используемой типовыми программами. Блоки/процессы работают в своем собственном потоке, не имеют общего состояния и общаются друг с другом только через сообщения.   -  person Panagiotis Kanavos    schedule 20.04.2021
comment
Короче говоря, пример, который вы привели, неуместен. Вам не нужен пользовательский блок вещания. Вам нужен другой пример и/или структура программы.   -  person Panagiotis Kanavos    schedule 20.04.2021
comment
@PanagiotisKanavos спасибо за ответы, я все еще сомневаюсь. Thread.Sleeps просто эмулируют некоторую реальную рабочую нагрузку, которая может выполняться в разное время. Что касается другой структуры программы — не могли бы вы предложить подходящую для задачи одного производителя, предоставляющего товары нескольким потребителям, где каждый потребитель должен иметь возможность получать все сообщения, если это достаточно быстро? Подумайте, например, об одном источнике журнала и нескольких приемниках асинхронного журнала.   -  person Mikhail    schedule 21.04.2021
comment
@TheodorZoulias да, я понимаю, что название не идеально, но мне трудно придумать лучшее (на самом деле я действительно пытаюсь узнать о плюсах и минусах потока данных TPL, прежде чем переключиться с реализации очередей вручную через замки и мониторы. Мне нравится общая идея дизайна, но я боюсь возможных скрытых подводных камней в реализации. Это первый макет, который я сделал, чтобы попробовать реализовать некоторые реальные случаи из нашего продукта. И я немного беспокоюсь, что я я столкнулся с такой проблемой с первой попытки, так что теперь мне интересно, действительно ли это происходит из-за того, что я неправильно использую TPL?   -  person Mikhail    schedule 21.04.2021
comment
@Mikhail Thread.Sleep не просто эмулирует рабочую нагрузку. Он принудительно приостанавливает поток пула потоков, который может выполнять множество задач, эффективно удаляя его из пула потоков. Среда выполнения должна создать новый поток для обслуживания других задач. Именно для того, чтобы избежать, была создана парадигма потока данных. Ваш автомобиль использует архитектуру потока данных — автомобили имеют несколько десятков однопоточных микроконтроллеров, обменивающихся сообщениями.   -  person Panagiotis Kanavos    schedule 21.04.2021
comment
@Mikhail the task of single producer providing items for multiple consumers pub/sub для Dataflow — это то же самое, что колеса для автомобиля, велосипеда или грузовика. Важная часть вещи, а не сама вещь. И хотя у велосипедов и грузовиков есть колеса, они совсем не одинаковы. Pub/sub это инструмент. Поток данных — это весь план построения для решения проблемы с использованием этого инструмента среди прочего. А one publisher many subscribers описывает только деталь, а не саму проблему   -  person Panagiotis Kanavos    schedule 21.04.2021
comment
@Mikhail, какую актуальную проблему на уровне процесса/архитектуры вы хотите решить? Это не pub/sub. I want to download, parse and use the data in 1000 URLs, combining it with outer sources — это то, что можно разбить на поток данных. Вот почему инструменты ETL, такие как SSIS, используют потоки данных. I want to process a stream of events to detect anomalies/fraud and block malicious IPs — это еще одна вещь, которая может использовать поток данных. В целом, работая с бесконечным потоком событий, можно использовать потоки данных.   -  person Panagiotis Kanavos    schedule 21.04.2021
comment
@Mikhail, если все, что вам нужно, это асинхронная очередь, с другой стороны, используйте Канал. Это низкоуровневая конструкция. Каналы можно использовать для создания потоков данных. На самом деле, если бы Channel был доступен в 2012 году, TPL DataFlow использовал бы его. Чтобы создать конвейер, вам нужно будет добавить собственный асинхронный код для чтения и обработки сообщений. Однако в простых сценариях это может быть так же просто, как await foreach(var message in channelReader.ReadAllAsync())   -  person Panagiotis Kanavos    schedule 21.04.2021
comment
Михаил, если вы ищете в TPL Dataflow инструмент, который может решить все вообразимые проблемы, связанные с параллелизмом, то TPL Dataflow, вероятно, не тот инструмент. Некоторые проблемы могут быть решены с помощью Dataflow легко и эффективно, в то время как другие могут быть решены с большим трудом или вообще не решены. Библиотека не является легко расширяемой. Это то, с чем вам придется бороться, если вы попытаетесь решить слишком необычные/идиоматические/локализованные задачи. Что касается конкретного примера, который вы предоставили, неясно, подходит ли поток данных, потому что вы не указали точно желаемое поведение.   -  person Theodor Zoulias    schedule 21.04.2021


Ответы (1)


Вам нужно выполнить .Complete() для root, затем Main() нужно дождаться, пока все потребители закончат пережевывать свою пищу, прежде чем Main завершится.

В верхней части Main:

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

Внутри вашего цикла for:

  • Добавьте всех своих потребителей в список (я назвал это композицией, сокращенно от «Завершения»).
  • Включите linkOptions как часть вызова LinkTo.

Добавьте это перед вашим ReadLine():

root.Complete();
Task.WaitAll(comps.ToArray());
person amonroejj    schedule 07.05.2021
comment
Однако даже с этими обновлениями я получил некоторые странные эффекты, когда потребитель № 4 потреблял меньше товаров в целом, а значение 6 никогда не было получено НИ ОДНИМ потребителем, потому что BroadcastBlock (вопреки интуиции) продолжает свой веселый путь, даже если потребители все тугодумы. Я не знаю много хороших вариантов использования для этого. Этот ответ может иметь обходной путь: stackoverflow.com/questions/22127660/ - person amonroejj; 07.05.2021
comment
Будут ли члены вашей реальной потребительской группы вести себя по-разному? Может быть, группа потребителей будет лучше, чем пытаться заставить BroadcastBlock сделать что-то, что не подходит. - person amonroejj; 07.05.2021
comment
Другая альтернатива, если позволяет память, состоит в том, чтобы связать BroadcastBlock с пятью соседними BufferBlocks и позволить потребителям читать из буферов. - person amonroejj; 07.05.2021
comment
Спасибо за ответ. Это правильный способ дождаться завершения всех заданий вместо Console.ReadLine(), да. Тем не менее, это не отвечает на исходную проблему, почему некоторые элементы отсутствуют даже у самого быстрого потребителя, и как надежно предотвратить это. - person Mikhail; 08.05.2021
comment
Я пробовал. Один буфер на потребителя работал. Все 5 потребителей получили все 10 входов. - person amonroejj; 10.05.2021