Как сделать parallel.for асинхронных методов

Какой лучший способ параллельной обработки в С # с некоторыми асинхронными методами. Позвольте мне объяснить с помощью простого кода

Пример сценария: у нас есть человек и 1000 текстовых файлов от него. мы хотим проверить, что его текстовые файлы не содержат конфиденциальных ключевых слов, и если один из его текстовых файлов содержит конфиденциальные ключевые слова, мы помечаем его как ненадежный. Метод, который проверяет это, является асинхронным, и, как только мы обнаружили одно из важных ключевых слов, дальнейшая обработка не требуется, и цикл проверки должен быть прерван для этого человека.

Чтобы добиться максимальной производительности и сделать это так быстро, мы должны использовать простой псудокод для параллельной обработки:

boolean sesitivedetected=false;
Parallel.ForEach(textfilecollection,async (textfile,parallelloopstate)=>
{
    if (await hassensitiveasync(textfile))
    {
         sensitivedetected=true;
         parallelloopstate.break()
    }
}
‌if (sensitivedetected)
    markuntrusted(person)

Проблема в том, что Parallel.ForEach не ждут завершения асинхронных задач, поэтому оператор ‌if (sensitivedetected) запускается сразу после завершения создания задачи. Я читал другие вопросы, такие как писать parallel.for с помощью async и async / await и Parallel.For и много других страниц. Эти темы полезны, когда вам нужно, чтобы результаты асинхронных методов были собраны и использованы позже, но в моем сценарии выполнение цикла должно быть завершено как можно скорее.


Обновление: образец кода:

        Boolean detected=false;
        Parallel.ForEach(UrlList,  async (url, pls) =>
          {
              using (HttpClient hc = new HttpClient())
              {
                  var result = await hc.GetAsync(url);

                  if ((await result.Content.ReadAsStringAsync()).Contains("sensitive"))
                  {
                      detected = true;
                      pls.Break();
                  }
              }
          });
        if (detected)
            Console.WriteLine("WARNING");

person X X    schedule 09.05.2017    source источник
comment
Краткая версия: не смешивайте Parallel.For() с async методами. Ваш вопрос действительно слишком широк, как указано; вы не включили хороший минимальный воспроизводимый пример, и существует множество способов интерпретации вашего псевдокода с точки зрения того, что каждая его часть делает в реальном коде. Но вы можете просто запускать задачи вместо использования Parallel, используя CancellationTokenSource по мере необходимости для связи с этими задачами, если их нужно прервать, и, конечно, с циклом, который их создает. Пожалуйста, улучшите вопрос, если вам нужен более конкретный совет.   -  person Peter Duniho    schedule 09.05.2017
comment
Если вам действительно необходимо использовать Parallel.For(), и вы действительно должны использовать метод async, который определяет, следует ли продолжать цикл, и этот метод является единственным ожидаемым выражением в вашем делегате Parallel.For(), тогда вы можете просто использовать его синхронно, то есть if (hassensitiveasync(textfile).Result). Как говорится в деталях реализации, это плохая практика, но если вы загоните себя в угол, иногда вам придется оставить следы.   -  person Peter Duniho    schedule 09.05.2017
comment
Думаю, основная проблема не в реализации или точном подробном коде. Возможно, не существует асинхронной версии библиотечного метода, поэтому у меня есть асинхронная версия метода. Мне нравится использовать параллель, потому что я делаю код максимально быстрым! Если вы хотите знать, почему вы можете смешать Parallel.For с async, прочтите это: ссылка. Я думаю, что этот `AsyncEnumerator ', вероятно, решит мою проблему, но я не знаю, как это сделать. Действительно, если вам нужен более точный код, я предоставлю его в обновленном Вопросе. @PeterDuniho   -  person X X    schedule 09.05.2017
comment
@XX Почему вы вообще используете многопоточность и параллельную обработку. Ваш ВОПРОС неверен. В C # (›4.5) вы не используете Thread (дорогостоящая конструкция) для всего, у нас более развитая парадигма использования Tasks.   -  person Aron    schedule 09.05.2017
comment
Прочтите это, пожалуйста, и вы сможете сообщить, почему: [ссылка] (github.com/tyrotoxin/AsyncEnumerable] @ Арон   -  person X X    schedule 09.05.2017
comment
@XX Ваш вопрос явно связан с вводом-выводом, а не с процессором. Стоимость распределения данных между потоками, пробуждения / сна и обработки дополнительных стеков сделает потоковую передачу бессмысленной. Но, по сути, нарезка резьбы всегда опасна.   -  person Aron    schedule 09.05.2017
comment
@XX Вы используете неправильный класс, чтобы делать неправильные вещи. Parallel.Foreach предназначен для параллелизма данных, т.е. обработки большого количества данных. Он не предназначен для асинхронных операций или реакции на события. Он также не используется для ожидания ответов от уже асинхронных событий, для этого Task.WhenAll. Вы можете запустить 100 GetAsync вызовов, поместить их в массив и ожидать их всех.   -  person Panagiotis Kanavos    schedule 09.05.2017
comment
Параллелизм данных @XX означает, что у вас много данных. Parallel.For/ForEach разделит данные и использует одну задачу для обработки каждого раздела. Когда вы это делаете, вы не хотите и не нуждаетесь в асинхронных операциях.   -  person Panagiotis Kanavos    schedule 09.05.2017
comment
@PanagiotisKanavos В обоих случаях это параллелизм данных. Что вам нужно сделать, так это то, что потоки - это метафора для процессоров. Установка большего количества процессоров на вашу сетевую карту не сделает ее быстрее.   -  person Aron    schedule 09.05.2017
comment
@Aron - это не параллелизм данных в том смысле, в каком его понимают большинство людей, или как он используется для объяснения различных сценариев, охватываемых TPL и компанией. Здесь нет данных для обработки, только сетевые вызовы. OP генерирует много вызовов ввода-вывода и хочет дождаться, пока один из них не вернет определенное значение.   -  person Panagiotis Kanavos    schedule 09.05.2017
comment
@PanagiotisKanavos Ты прав. Я могу использовать Task.WhenAll, но почему я должен ждать завершения всех задач. Результат выполнения задачи с желаемым результатом может отменить все другие незавершенные задачи. Как это сделать?   -  person X X    schedule 09.05.2017
comment
@XX Вот почему вы хотите использовать потоковую обработку. Есть две библиотеки для потоковой обработки: Reactive Extensions и TPL Dataflow. Ниже приведен пример того, как вы можете добиться этого с помощью реактивных расширений.   -  person Aron    schedule 09.05.2017
comment
@Aron, я не слышал об этих двух библиотеках. Я должен пойти и посмотреть, как с ними работать и насколько они полезны :) tnx   -  person X X    schedule 09.05.2017
comment
@XX Если вы хотите ограничить количество HTTP-запросов в полете, вы можете использовать SemaphoreSlim.   -  person Aron    schedule 10.05.2017


Ответы (1)


Самый простой способ добиться того, что вам нужно (а не того, что вы хотите, потому что потоки - это зло). Использовать ReactiveExtensions.

var firstSensitive = await UrlList
                     .Select(async url => {
                         using(var http = new HttpClient()
                         {
                             var result = await hc.GetAsync(url);
                             return await result.Content.ReadAsStringAsync();
                         }
                     })
                     .SelectMany(downloadTask => downloadTask.ToObservable())
                     .Where(result => result.Contains("sensitive"))
                     .FirstOrDefaultAsync();

if(firstSensitive != null)
    Console.WriteLine("WARNING");

Чтобы ограничить количество одновременных HTTP-запросов:

int const concurrentRequestLimit = 4;
var semaphore = new SemaphoreSlim(concurrentRequestLimit);
var firstSensitive = await UrlList
                     .Select(async url => {
                         await semaphore.WaitAsync()
                         try
                         using(var http = new HttpClient()
                         {
                             var result = await hc.GetAsync(url);
                             return await result.Content.ReadAsStringAsync();
                         }
                         finally
                             semaphore.Release();
                     })
                     .SelectMany(downloadTask => downloadTask.ToObservable())
                     .Where(result => result.Contains("sensitive"))
                     .FirstOrDefaultAsync();

if(firstSensitive != null)
    Console.WriteLine("WARNING");
person Aron    schedule 09.05.2017