Вы можете представить запрос LINQ как атомарную конструкцию с одним планом выполнения, но может быть полезнее представить его как конвейер, состоящий из нескольких блоков потока данных. Выход каждого блока становится входом следующего блока в потоке данных, и блоки обрабатывают элементы одновременно, как только они становятся доступными. Взгляните, например, на следующий запрос, состоящий из двух блоков, представленных двумя операторами Select
. Первый блок настроен на обработку 3 элементов одновременно (параллельно), а второй блок настроен на обработку каждого элемента последовательно. Продолжительность обработки каждого элемента составляет 1000 мс для параллельного блока и 500 мс для последовательного блока:
var results = Partitioner
.Create(Enumerable.Range(1, 10), EnumerablePartitionerOptions.NoBuffering)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(3)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
+ $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
Thread.Sleep(1000); // Simulate some CPU-bound work
return x;
})
.AsSequential()
.Select(x =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
+ $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
Thread.Sleep(500); // Simulate some CPU-bound work
return x;
})
.ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");
Если вы запустите этот код, вы получите такой вывод:
08:32:17.628 [4] Parallel #2
08:32:17.628 [5] Parallel #1
08:32:17.628 [6] Parallel #3
08:32:18.642 [6] Parallel #5
08:32:18.642 [5] Parallel #4
08:32:18.644 [4] Parallel #6
08:32:18.651 [1] Sequential #1
08:32:19.644 [6] Parallel #7
08:32:19.645 [4] Parallel #8
08:32:19.646 [5] Parallel #9
08:32:19.654 [1] Sequential #2
08:32:20.156 [1] Sequential #3
08:32:20.648 [4] Parallel #10
08:32:20.658 [1] Sequential #4
08:32:21.161 [1] Sequential #5
08:32:21.663 [1] Sequential #6
08:32:22.164 [1] Sequential #7
08:32:22.672 [1] Sequential #8
08:32:23.173 [1] Sequential #9
08:32:23.675 [1] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
Обратите внимание, что последовательная обработка уже запущена до завершения всей параллельной обработки. Для достижения этого эффекта я использовал параметры конфигурации EnumerablePartitionerOptions.NoBuffering
a> и ParallelMergeOptions.NotBuffered
, чтобы предотвратить первый блок от буферизации его ввода и вывода.
Для полноты давайте перепишем этот запрос, используя Библиотека потока данных TPL. Код становится более многословным и менее беглым, но контроль выполнения становится более точным, а также становятся доступными асинхронные рабочие процессы (PLINQ не поддерживает асинхронность):
var block1 = new TransformBlock<int, int>(async x =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
+ $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
await Task.Delay(1000); // Simulate some I/O operation
return x;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 3,
EnsureOrdered = true // redundant since EnsureOrdered is the default
});
var block2 = new TransformBlock<int, int>(async x =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
+ $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
await Task.Delay(500); // Simulate some I/O operation
return x;
}); // MaxDegreeOfParallelism = 1 is the default
block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });
// Feeding the first block
foreach (var x in Enumerable.Range(1, 10))
{
await block1.SendAsync(x);
}
block1.Complete();
var results = new List<int>(); // Collecting the results is a bit painful
while (await block2.OutputAvailableAsync())
{
while (block2.TryReceive(out var result))
{
results.Add(result);
}
}
await block2.Completion;
Console.WriteLine($"Results: {String.Join(", ", results)}");
Выход:
08:59:25.102 [6] Parallel #2
08:59:25.102 [4] Parallel #1
08:59:25.102 [7] Parallel #3
08:59:26.127 [7] Parallel #4
08:59:26.129 [6] Parallel #5
08:59:26.143 [4] Parallel #6
08:59:26.147 [5] Sequential #1
08:59:26.648 [5] Sequential #2
08:59:27.129 [6] Parallel #7
08:59:27.129 [7] Parallel #8
08:59:27.144 [4] Parallel #9
08:59:27.149 [5] Sequential #3
08:59:27.650 [5] Sequential #4
08:59:28.131 [6] Parallel #10
08:59:28.152 [5] Sequential #5
08:59:28.653 [5] Sequential #6
08:59:29.155 [5] Sequential #7
08:59:29.659 [5] Sequential #8
08:59:30.160 [5] Sequential #9
08:59:30.674 [5] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
person
Theodor Zoulias
schedule
15.02.2020
i % 2 == 0
внутри лямбдыWhere
будет выполняться параллельно в нескольких потоках. Это то, что вы хотите?AsSequential
в конце запроса не имеет значения, потому что после этого больше не выполняются вычисления. - person Theodor Zoulias   schedule 13.02.2020