Parallel ForEach ждет 500 мс перед нерестом

У меня такая ситуация:

var tasks = new List<ITask> ...
Parallel.ForEach(tasks, currentTask => currentTask.Execute() );

Можно ли указать PLinq ждать 500 мс, прежде чем будет создан следующий поток?

System.Threading.Thread.Sleep(5000);

person cs0815    schedule 15.07.2013    source источник
comment
Чего вы пытаетесь достичь здесь?   -  person Daniel Hilgarth    schedule 15.07.2013
comment
Я бы предположил, что Parallel.ForEach(tasks, currentTask => { Thread.Sleep(5000); currentTask.Execute(); }); поможет, но мне было бы интересно узнать, почему вы пытаетесь это сделать - звучит как обходной путь для чего-то?   -  person James    schedule 15.07.2013
comment
Каждая задача, для выполнения которой требуется некоторое время, получает свои данные из другого ресурса, который может выполняться только каждые 0,5 секунды. Думаю, я мог бы разделить получение данных и выполнение задачи...   -  person cs0815    schedule 15.07.2013
comment
Нет, это может выйти из строя в любое время, не думайте, что это завершится в течение 500 мс, вы можете использовать WaitHandles здесь   -  person Sriram Sakthivel    schedule 15.07.2013
comment
Разделение было бы первым выбором здесь.   -  person Henk Holterman    schedule 15.07.2013
comment
Да только что понял это. Последняя реализация работает нормально. Спасибо!   -  person cs0815    schedule 15.07.2013


Ответы (4)


Вы используете Parallel.Foreach совершенно неправильно. Вам следует создать специальный Enumerator, который ограничивает скорость получения данных каждые 500 мс.

Я сделал некоторые предположения о том, как работает ваш DTO, поскольку вы не предоставили никаких подробностей.

private IEnumerator<SomeResource> GetRateLimitedResource()
{
    SomeResource someResource = null;
    do
    {
        someResource = _remoteProvider.GetData();

        if(someResource != null)
        {
             yield return someResource;
             Thread.Sleep(500);
        }
    } while (someResource != null);
}

вот как должен выглядеть ваш параллель

Parallel.ForEach(GetRateLimitedResource(), SomeFunctionToProcessSomeResource);
person Scott Chamberlain    schedule 15.07.2013
comment
Хорошая идея, потому что она простая, но мощная. - person usr; 15.07.2013
comment
Это ждет 500 мс, прежде чем будет получен первый элемент. Это намеренно? - person svick; 16.07.2013
comment
@svick Мне нужно было куда-то положить сон, я перешел туда после захвата данных, поэтому тогда он будет спать. Код можно улучшить, взяв DateTime.Now и сохранив его в локальной переменной, а затем, когда произойдет следующая итерация, вы сможете проверить, прошло ли 500 мс, и спать только необходимое количество времени. - person Scott Chamberlain; 16.07.2013
comment
Это, вероятно, не будет работать должным образом, потому что, насколько я знаю, Parallel.ForEach по умолчанию использует разбиение на фрагменты. Это означает, что постепенно будет перечисляться все больше и больше элементов за раз. Разбиение на фрагменты имеет смысл, когда желательно уменьшить накладные расходы на синхронизацию, но в этом случае накладные расходы полностью затмеваются искусственной задержкой. Поэтому имеет смысл отключить его, заменив GetRateLimitedResource() на Partitioner.Create(GetRateLimitedResource(), EnumerablePartitionerOptions.NoBuffering). - person Theodor Zoulias; 17.03.2021

Уже есть хорошие предложения. Я согласен с другими, что вы используете PLINQ не так, как предполагалось.

Я бы предложил использовать System.Threading.Timer . Это, вероятно, лучше, чем написание метода, который возвращает IEnumerable<>, вызывающий задержку в полсекунды, потому что вам может не понадобиться ждать все полсекунды, в зависимости от того, сколько времени прошло с момента вашего последнего вызова API.

С помощью таймера он будет вызывать делегата, который вы ему предоставили, с указанным вами интервалом, поэтому, даже если первая задача не будет выполнена, через полсекунды он вызовет вашего делегата в другом потоке, поэтому не будет быть любое дополнительное ожидание.

Судя по вашему примеру кода, у вас есть список задач, в этом случае я бы использовал System.Collections.Concurrent.ConcurrentQueue для отслеживания задач. Как только очередь опустеет, выключите таймер.

person Brian Ball    schedule 15.07.2013

Вместо этого вы можете использовать Enumerable.Aggregate.

var task = tasks.Aggregate((t1, t2) =>
                                t1.ContinueWith(async _ =>
                                    { Thread.Sleep(500); return t2.Result; }));

Если вы не хотите связывать задачи в цепочку, существует также перегрузка для Select при условии, что задачи расположены в порядке задержки.

var tasks = Enumerable
              .Range(1, 10)
              .Select(x => Task.Run(() => x * 2))
              .Select((x, i) => Task.Delay(TimeSpan.FromMilliseconds(i * 500))
                                    .ContinueWith(_ => x.Result));

foreach(var result in tasks.Select(x => x.Result))
{
    Console.WriteLine(result);
}

Из комментариев лучше было бы охранять ресурс вместо использования временной задержки.

static object Locker = new object();

static int GetResultFromResource(int arg)
{
    lock(Locker)
    {
        Thread.Sleep(500);
        return arg * 2;
    }
}

var tasks = Enumerable
          .Range(1, 10)
          .Select(x => Task.Run(() => GetResultFromResource(x)));

foreach(var result in tasks.Select(x => x.Result))
{
    Console.WriteLine(result);
}
person Dustin Kingen    schedule 15.07.2013
comment
Вид отвечает на вопрос, но удаляет все параллелизмы. - person usr; 15.07.2013
comment
Я добавил еще один метод, который будет работать одновременно. - person Dustin Kingen; 15.07.2013

В этом случае как насчет шаблона «Производитель-Потребитель» с BlockingCollection<T>?

var tasks = new BlockingCollection<ITask>();

// add tasks, if this is an expensive process, put it out onto a Task
// tasks.Add(x);

// we're done producin' (allows GetConsumingEnumerable to finish)
tasks.CompleteAdding();

RunTasks(tasks);

С одним потребительским потоком:

static void RunTasks(BlockingCollection<ITask> tasks)
{
    foreach (var task in tasks.GetConsumingEnumerable())
    {
        task.Execute();

        // this may not be as accurate as you would like
        Thread.Sleep(500);
    }
}

Если у вас есть доступ к .Net 4.5, вы можете использовать Task.Delay:

static void RunTasks(BlockingCollection<ITask> tasks)
{
    foreach (var task in tasks.GetConsumingEnumerable())
    {
        Task.Delay(500)
            .ContinueWith(() => task.Execute())
            .Wait();
    }
}
person user7116    schedule 15.07.2013