Влияние использования AsParallel() и AsSequential() в одном запросе? С#

Я просматривал PLINQ в одной из книг, и там было сказано:

Если у вас есть сложный запрос, который может выиграть от параллельной обработки, но также содержит некоторые части, которые должны выполняться последовательно, вы можете использовать AsSequential, чтобы остановить параллельную обработку вашего запроса.

Например:

var parallelResult = numbers.AsParallel().AsOrdered()
    .Where(i => i % 2 == 0).AsSequential();

Я хочу понять, почему это разрешено и как это влияет на результат? Он идет параллельно? Работает последовательно? Сейчас это не имеет никакого смысла.


person Muhammad Faraz    schedule 13.02.2020    source источник
comment
Код i % 2 == 0 внутри лямбды Where будет выполняться параллельно в нескольких потоках. Это то, что вы хотите? AsSequential в конце запроса не имеет значения, потому что после этого больше не выполняются вычисления.   -  person Theodor Zoulias    schedule 13.02.2020
comment
@TheodorZoulias Я хочу знать, есть ли какие-либо преимущества от совместного вызова этих двух методов.   -  person Muhammad Faraz    schedule 13.02.2020
comment
Это один из тех случаев, когда... Например, если вы хотите получить данные, вы можете получить все параллельно. Теперь это здорово и будет быстрее, чем делать это последовательно. Но иногда вам придется проделать дополнительную логику, которая должна будет оценивать каждое значение одно за другим. Например, при заказе. Вы не сможете сделать это параллельно, потому что вам нужно сохранить порядок, который был установлен в предыдущем предложении запроса. Вот хороший пример   -  person Scircia    schedule 13.02.2020
comment
О AsParallel() и AsSequential(), возможно, вы можете обратиться к stackoverflow.com/a/29124656/8335151   -  person Kyle Wang - MSFT    schedule 14.02.2020


Ответы (1)


Вы можете представить запрос LINQ как атомарную конструкцию с одним планом выполнения, но может быть полезнее представить его как конвейер, состоящий из нескольких блоков потока данных. Выход каждого блока становится входом следующего блока в потоке данных, и блоки обрабатывают элементы одновременно, как только они становятся доступными. Взгляните, например, на следующий запрос, состоящий из двух блоков, представленных двумя операторами Select. Первый блок настроен на обработку 3 элементов одновременно (параллельно), а второй блок настроен на обработку каждого элемента последовательно. Продолжительность обработки каждого элемента составляет 1000 мс для параллельного блока и 500 мс для последовательного блока:

var results = Partitioner
    .Create(Enumerable.Range(1, 10), EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(3)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
            + $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
        Thread.Sleep(1000); // Simulate some CPU-bound work
        return x;
    })
    .AsSequential()
    .Select(x =>
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
            + $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
        Thread.Sleep(500); // Simulate some CPU-bound work
        return x;
    })
    .ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");

Если вы запустите этот код, вы получите такой вывод:

08:32:17.628 [4] Parallel #2
08:32:17.628 [5] Parallel #1
08:32:17.628 [6] Parallel #3
08:32:18.642 [6] Parallel #5
08:32:18.642 [5] Parallel #4
08:32:18.644 [4] Parallel #6
08:32:18.651 [1] Sequential #1
08:32:19.644 [6] Parallel #7
08:32:19.645 [4] Parallel #8
08:32:19.646 [5] Parallel #9
08:32:19.654 [1] Sequential #2
08:32:20.156 [1] Sequential #3
08:32:20.648 [4] Parallel #10
08:32:20.658 [1] Sequential #4
08:32:21.161 [1] Sequential #5
08:32:21.663 [1] Sequential #6
08:32:22.164 [1] Sequential #7
08:32:22.672 [1] Sequential #8
08:32:23.173 [1] Sequential #9
08:32:23.675 [1] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

Обратите внимание, что последовательная обработка уже запущена до завершения всей параллельной обработки. Для достижения этого эффекта я использовал параметры конфигурации EnumerablePartitionerOptions.NoBuffering и ParallelMergeOptions.NotBuffered, чтобы предотвратить первый блок от буферизации его ввода и вывода.

Для полноты давайте перепишем этот запрос, используя Библиотека потока данных TPL. Код становится более многословным и менее беглым, но контроль выполнения становится более точным, а также становятся доступными асинхронные рабочие процессы (PLINQ не поддерживает асинхронность):

var block1 = new TransformBlock<int, int>(async x =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
        + $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
    await Task.Delay(1000); // Simulate some I/O operation
    return x;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 3,
    EnsureOrdered = true // redundant since EnsureOrdered is the default
});

var block2 = new TransformBlock<int, int>(async x =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
        + $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
    await Task.Delay(500); // Simulate some I/O operation
    return x;
}); // MaxDegreeOfParallelism = 1 is the default

block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });

// Feeding the first block
foreach (var x in Enumerable.Range(1, 10))
{
    await block1.SendAsync(x);
}
block1.Complete();

var results = new List<int>(); // Collecting the results is a bit painful
while (await block2.OutputAvailableAsync())
{
    while (block2.TryReceive(out var result))
    {
        results.Add(result);
    }
}
await block2.Completion;
Console.WriteLine($"Results: {String.Join(", ", results)}");

Выход:

08:59:25.102 [6] Parallel #2
08:59:25.102 [4] Parallel #1
08:59:25.102 [7] Parallel #3
08:59:26.127 [7] Parallel #4
08:59:26.129 [6] Parallel #5
08:59:26.143 [4] Parallel #6
08:59:26.147 [5] Sequential #1
08:59:26.648 [5] Sequential #2
08:59:27.129 [6] Parallel #7
08:59:27.129 [7] Parallel #8
08:59:27.144 [4] Parallel #9
08:59:27.149 [5] Sequential #3
08:59:27.650 [5] Sequential #4
08:59:28.131 [6] Parallel #10
08:59:28.152 [5] Sequential #5
08:59:28.653 [5] Sequential #6
08:59:29.155 [5] Sequential #7
08:59:29.659 [5] Sequential #8
08:59:30.160 [5] Sequential #9
08:59:30.674 [5] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
person Theodor Zoulias    schedule 15.02.2020