Я изучаю, способен ли TPL-Dataflow избавить нас от написания шаблонного кода с блокировками и мониторами для наших высококонкурентных приложений.
Итак, я моделирую простой сценарий с одним производителем и несколькими потребителями, каждый из которых должен получать все произведенные сообщения. И если одни потребители работают медленнее других, это не должно вызывать стагнацию системы.
Вот код:
using NLog;
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApp10
{
internal sealed class Program
{
private static readonly Logger m_logger = LogManager.GetCurrentClassLogger();
static void Main(string[] args)
{
BroadcastBlock<int> root = new BroadcastBlock<int>(d => d);
ExecutionDataflowBlockOptions consumerOptions = new ExecutionDataflowBlockOptions() {
BoundedCapacity = 3
};
for (int consumerIndex = 0; consumerIndex < 5; ++consumerIndex)
{
int c = consumerIndex;
ActionBlock<int> consumer = new ActionBlock<int>(
(int d) => {
m_logger.Trace($"[#{c}] Starting consuming {d}");
Thread.Sleep(c * 100);
m_logger.Trace($"[#{c}] Ended consuming {d}");
},
consumerOptions
);
root.LinkTo(consumer);
}
Producer(10, root);
Console.ReadLine();
}
private static void Producer(int n, ITargetBlock<int> target)
{
for (int i = 0; i < n; ++i)
{
m_logger.Trace($"Starting producing {i}");
if (!target.Post(i))
{
throw new Exception($"Failed to post message #{i}");
}
m_logger.Trace($"Ending producing {i}");
Thread.Sleep(50);
}
}
}
}
Как видите, я ограничиваю размер буферов в потребителях до 3 (чтобы предотвратить бесконечный рост буфера для медленных потребителей).
Каждый следующий потребитель работает медленнее предыдущего. Потребитель №0 — самый быстрый без задержек. И производитель имеет небольшую задержку в производстве.
Я ожидаю, что по крайней мере потребитель № 0 будет потреблять все сообщения, а потребитель № 4 не получит некоторые сообщения, потому что его буфер переполнится.
Вот результаты:
2021-04-15 22:44:15.4905 [T1] Starting producing 0
2021-04-15 22:44:15.5049 [T1] Ending producing 0
2021-04-15 22:44:15.5166 [T4] [#4] Starting consuming 0
2021-04-15 22:44:15.5285 [T7] [#0] Starting consuming 0
2021-04-15 22:44:15.5285 [T7] [#0] Ended consuming 0
2021-04-15 22:44:15.5285 [T7] [#1] Starting consuming 0
2021-04-15 22:44:15.5573 [T1] Starting producing 1
2021-04-15 22:44:15.5573 [T1] Ending producing 1
2021-04-15 22:44:15.5573 [T5] [#0] Starting consuming 1
2021-04-15 22:44:15.5573 [T5] [#0] Ended consuming 1
2021-04-15 22:44:15.5573 [T5] [#2] Starting consuming 0
2021-04-15 22:44:15.5573 [T6] [#3] Starting consuming 0
2021-04-15 22:44:15.6081 [T1] Starting producing 2
2021-04-15 22:44:15.6081 [T1] Ending producing 2
2021-04-15 22:44:15.6352 [T7] [#1] Ended consuming 0
2021-04-15 22:44:15.6352 [T7] [#1] Starting consuming 1
2021-04-15 22:44:15.6592 [T1] Starting producing 3
2021-04-15 22:44:15.6592 [T1] Ending producing 3
2021-04-15 22:44:15.7102 [T1] Starting producing 4
2021-04-15 22:44:15.7102 [T1] Ending producing 4
2021-04-15 22:44:15.7353 [T7] [#1] Ended consuming 1
2021-04-15 22:44:15.7353 [T7] [#1] Starting consuming 2
2021-04-15 22:44:15.7612 [T5] [#2] Ended consuming 0
2021-04-15 22:44:15.7612 [T5] [#2] Starting consuming 1
2021-04-15 22:44:15.7612 [T1] Starting producing 5
2021-04-15 22:44:15.7612 [T1] Ending producing 5
2021-04-15 22:44:15.8132 [T1] Starting producing 6
2021-04-15 22:44:15.8132 [T1] Ending producing 6
2021-04-15 22:44:15.8420 [T7] [#1] Ended consuming 2
2021-04-15 22:44:15.8420 [T7] [#1] Starting consuming 3
2021-04-15 22:44:15.8603 [T6] [#3] Ended consuming 0
2021-04-15 22:44:15.8603 [T6] [#3] Starting consuming 1
2021-04-15 22:44:15.8764 [T1] Starting producing 7
2021-04-15 22:44:15.8764 [T1] Ending producing 7
2021-04-15 22:44:15.9174 [T4] [#4] Ended consuming 0
2021-04-15 22:44:15.9174 [T4] [#4] Starting consuming 1
2021-04-15 22:44:15.9369 [T1] Starting producing 8
2021-04-15 22:44:15.9369 [T1] Ending producing 8
2021-04-15 22:44:15.9509 [T7] [#1] Ended consuming 3
2021-04-15 22:44:15.9509 [T7] [#1] Starting consuming 4
2021-04-15 22:44:15.9639 [T5] [#2] Ended consuming 1
2021-04-15 22:44:15.9639 [T5] [#2] Starting consuming 2
2021-04-15 22:44:15.9874 [T1] Starting producing 9
2021-04-15 22:44:15.9874 [T1] Ending producing 9
2021-04-15 22:44:16.0515 [T7] [#1] Ended consuming 4
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 2
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 2
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 3
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 3
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 4
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 4
2021-04-15 22:44:16.0515 [T7] [#1] Starting consuming 5
2021-04-15 22:44:16.1525 [T7] [#1] Ended consuming 5
2021-04-15 22:44:16.1525 [T7] [#1] Starting consuming 6
2021-04-15 22:44:16.1525 [T6] [#3] Ended consuming 1
2021-04-15 22:44:16.1525 [T6] [#3] Starting consuming 2
2021-04-15 22:44:16.1645 [T5] [#2] Ended consuming 2
2021-04-15 22:44:16.1645 [T5] [#2] Starting consuming 4
2021-04-15 22:44:16.2526 [T7] [#1] Ended consuming 6
2021-04-15 22:44:16.2526 [T7] [#1] Starting consuming 7
2021-04-15 22:44:16.3177 [T4] [#4] Ended consuming 1
2021-04-15 22:44:16.3177 [T4] [#4] Starting consuming 2
2021-04-15 22:44:16.3537 [T7] [#1] Ended consuming 7
2021-04-15 22:44:16.3537 [T7] [#1] Starting consuming 9
2021-04-15 22:44:16.3537 [T5] [#2] Ended consuming 4
2021-04-15 22:44:16.3537 [T5] [#2] Starting consuming 5
2021-04-15 22:44:16.4547 [T7] [#1] Ended consuming 9
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 5
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 5
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 6
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 6
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 7
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 7
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 9
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 9
2021-04-15 22:44:16.4607 [T6] [#3] Ended consuming 2
2021-04-15 22:44:16.4607 [T6] [#3] Starting consuming 4
2021-04-15 22:44:16.5648 [T5] [#2] Ended consuming 5
2021-04-15 22:44:16.5648 [T5] [#2] Starting consuming 9
2021-04-15 22:44:16.7179 [T4] [#4] Ended consuming 2
2021-04-15 22:44:16.7179 [T4] [#4] Starting consuming 4
2021-04-15 22:44:16.7610 [T6] [#3] Ended consuming 4
2021-04-15 22:44:16.7610 [T6] [#3] Starting consuming 9
2021-04-15 22:44:16.7610 [T5] [#2] Ended consuming 9
2021-04-15 22:44:17.0611 [T6] [#3] Ended consuming 9
2021-04-15 22:44:17.1182 [T4] [#4] Ended consuming 4
2021-04-15 22:44:17.1182 [T4] [#4] Starting consuming 9
2021-04-15 22:44:17.5185 [T4] [#4] Ended consuming 9
Что меня озадачивает, так это то, что потребитель № 0 никогда не получает сообщение 8
. И на самом деле ни один другой потребитель не получает это сообщение. Почему так? Это ожидаемое поведение для потока данных?
Если вы хотите проверить, мой NLog.config выглядит следующим образом (я использую цель AsyncWrapper
, чтобы доступ к файлу не влиял на результаты моего эксперимента):
<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.nlog-project.org/schemas/NLog.xsd NLog.xsd"
internalLogFile="nlog.log"
internalLogLevel="Warn"
throwExceptions="false"
parseMessageTemplates="false"
>
<variable name="varExceptionMsg" value="${exception:format=Message}"/>
<variable name="varMessageWithException" value="${message}${onexception:inner= ${varExceptionMsg}}"/>
<variable name="msg4File" value="${longdate} [T${threadid}${threadname}] ${varMessageWithException} ${onexception:inner=${newline}${exception:format=tostring:maxInnerExceptionLevel=2:innerFormat=tostring}}" />
<targets>
<target name="file" xsi:type="AsyncWrapper" queueLimit="5000" overflowAction="Discard">
<target xsi:type="File"
layout="${msg4File}"
fileName="${basedir}/logs/${processname}.${shortdate}.log"
keepFileOpen="true"
encoding="utf-8"
/>
</target>
</targets>
<rules>
<logger name="*" minlevel="Trace" writeTo="file" />
</rules>
</nlog>
ThreadPool.SetMinThreads(100, 100);
и посмотреть, будет ли это иметь значение? Это предлагается не как исправление, а как способ устранения проблемы, которую вы наблюдаете. - person Theodor Zoulias   schedule 15.04.2021BroadcastBlock<T>
с определенным поведением. Например,BroadcastBlock<T>
гарантирует, что каждое полученное сообщение будет доставлено (предложено и принято) по крайней мере одной из связанных целей. Вы можете получить некоторые идеи о том, как создать такой блок в потоке данных TPL">здесь. - person Theodor Zoulias   schedule 16.04.2021Thread.Sleep
блокируют рабочие задачи, обрабатывающие сообщения. Там вообще нет трубопровода - person Panagiotis Kanavos   schedule 20.04.2021Thread.Sleep
s просто эмулируют некоторую реальную рабочую нагрузку, которая может выполняться в разное время. Что касается другой структуры программы — не могли бы вы предложить подходящую для задачи одного производителя, предоставляющего товары нескольким потребителям, где каждый потребитель должен иметь возможность получать все сообщения, если это достаточно быстро? Подумайте, например, об одном источнике журнала и нескольких приемниках асинхронного журнала. - person Mikhail   schedule 21.04.2021Thread.Sleep
не просто эмулирует рабочую нагрузку. Он принудительно приостанавливает поток пула потоков, который может выполнять множество задач, эффективно удаляя его из пула потоков. Среда выполнения должна создать новый поток для обслуживания других задач. Именно для того, чтобы избежать, была создана парадигма потока данных. Ваш автомобиль использует архитектуру потока данных — автомобили имеют несколько десятков однопоточных микроконтроллеров, обменивающихся сообщениями. - person Panagiotis Kanavos   schedule 21.04.2021the task of single producer providing items for multiple consumers
pub/sub для Dataflow — это то же самое, что колеса для автомобиля, велосипеда или грузовика. Важная часть вещи, а не сама вещь. И хотя у велосипедов и грузовиков есть колеса, они совсем не одинаковы.Pub/sub
это инструмент. Поток данных — это весь план построения для решения проблемы с использованием этого инструмента среди прочего. Аone publisher many subscribers
описывает только деталь, а не саму проблему - person Panagiotis Kanavos   schedule 21.04.2021pub/sub
.I want to download, parse and use the data in 1000 URLs, combining it with outer sources
— это то, что можно разбить на поток данных. Вот почему инструменты ETL, такие как SSIS, используют потоки данных.I want to process a stream of events to detect anomalies/fraud and block malicious IPs
— это еще одна вещь, которая может использовать поток данных. В целом, работая с бесконечным потоком событий, можно использовать потоки данных. - person Panagiotis Kanavos   schedule 21.04.2021await foreach(var message in channelReader.ReadAllAsync())
- person Panagiotis Kanavos   schedule 21.04.2021