Я прошел через поток данных TPL и столкнулся с очень раздражающей проблемой, возникшей в коде, использующем TrasformBlock
, связанный с ActionBlock
.
В конце концов я обнаружил, что элементы застряли в выходной очереди TransformBlock
, так как его свойство OutputCount
постоянно возвращало значение выше "0". Вот почему все приложение заблокировано. Однако он разблокируется, как только я позвоню TransformBlock.TryReceiveAll()
.
Может кто-нибудь, пожалуйста, дайте мне знать, если я что-то пропустил или как предотвратить такое поведение?
static void Main()
{
int total = 0;
int itemsProcessing = 0;
TransformBlock<int, Tuple<int, double>> transformBlock = new TransformBlock<int, Tuple<int, double>>(
i => new Tuple<int, double>(i, Math.Sqrt(i)),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 20,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
ActionBlock<Tuple<int, double>> outputBlock = new ActionBlock<Tuple<int, double>>(async tuple =>
{
await Task.Delay(1000); // simulating data output delay
Interlocked.Decrement(ref itemsProcessing);
},
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 5,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
transformBlock.Completion.ContinueWith(t => outputBlock.Complete());
using (Timer timer = new Timer(o =>
{
Console.Title = string.Format(
"{0}: {1}/{2} {3}/{4}/{5}",
Assembly.GetExecutingAssembly().GetName().Name,
Volatile.Read(ref itemsProcessing), Volatile.Read(ref total),
transformBlock.InputCount, transformBlock.OutputCount, outputBlock.InputCount);
}, null, 100, 100))
{
using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
for (int i = 0; i < 40; i++)
{
Thread.Sleep(100); // simulating new item retrieval delay
Interlocked.Increment(ref total);
Interlocked.Increment(ref itemsProcessing);
transformBlock.SendAsync(i).Wait();
}
}
Console.WriteLine("Enqueued");
transformBlock.Complete();
outputBlock.Completion.Wait();
Console.WriteLine("Finish");
timer.Change(Timeout.Infinite, Timeout.Infinite);
timer.Dispose();
}
}