При использовании ConcurrentQueue попытка удаления из очереди при параллельном цикле

Я использую параллельные структуры данных в своем приложении .NET 4, и у меня есть ConcurrentQueue, который добавляется, пока я его обрабатываю.

Я хочу сделать что-то вроде:

personqueue.AsParallel().WithDegreeOfParallelism(20).ForAll(i => ... );

поскольку я делаю вызовы базы данных для сохранения данных, я ограничиваю количество одновременных потоков.

Но я ожидаю, что ForAll не будет исключен из очереди, и я беспокоюсь о том, чтобы просто сделать

ForAll(i => {
    personqueue.personqueue.TryDequeue(...);
    ...
});

так как нет гарантии, что я вытащу правильный.

Итак, как я могу перебирать коллекцию и удалять из очереди параллельным образом.

Или было бы лучше использовать PLINQ для параллельной обработки?


person James Black    schedule 08.06.2010    source источник


Ответы (2)


Ну, я не уверен на 100%, что вы пытаетесь архивировать здесь. Вы пытаетесь просто удалить все элементы из очереди, пока ничего не останется? Или просто убрать из очереди множество предметов за один раз?

Первое, вероятно, неожиданное поведение начинается с этого утверждения:

 theQueue.AsParallel()

Для ConcurrentQueue вы получаете Snapshot-Enumerator. Поэтому, когда вы перебираете параллельный стек, вы перебираете только моментальный снимок, а не «живую» очередь.

В общем, я думаю, что не стоит повторять что-то, что вы меняете во время итерации.

Таким образом, другое решение будет выглядеть так:

        // this way it's more clear, that we only deque for theQueue.Count items
        // However after this, the queue is probably not empty
        // or maybe the queue is also empty earlier   
        Parallel.For(0, theQueue.Count,
                     new ParallelOptions() {MaxDegreeOfParallelism = 20},
                     () => { 
                         theQueue.TryDequeue(); //and stuff
                     });

Это позволяет избежать манипуляций с чем-либо при повторении. Однако после этого оператора очередь все еще может содержать данные, которые были добавлены во время цикла for.

Чтобы сделать очередь пустой на данный момент времени, вам, вероятно, потребуется немного больше работы. Вот действительно уродливое решение. Пока в очереди есть еще элементы, создайте новые задачи. Каждая задача начинает удаляться из очереди до тех пор, пока это возможно. В конце ждем окончания всех задач. Чтобы ограничить параллелизм, мы никогда не создаем более 20 задач.

        // Probably a kitty died because of this ugly code ;)
        // However, this code tries to get the queue empty in a very aggressive way
        Action consumeFromQueue = () =>
                                      {
                                          while (tt.TryDequeue())
                                          {
                                              ; // do your stuff
                                          }
                                      };
        var allRunningTasks = new Task[MaxParallism];
        for(int i=0;i<MaxParallism && tt.Count>0;i++)
        {
            allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);  
        }
        Task.WaitAll(allRunningTasks);
person Gamlor    schedule 10.06.2010
comment
Я попробую среднюю идею, так как это может сработать. Это часть веб-сервиса, который может получить 10 тысяч обновлений за очень короткое время, по 1 за раз, но я не хочу забивать базу данных, пытаясь делать каждое обновление по мере их получения, поэтому я буду пихать их в статическая очередь в синглтоне, и функция выполняет обработку. Мое решение не идеально, но мне нужно защитить базу данных в первую очередь. - person James Black; 11.06.2010

Если вы стремитесь к высокому уровню на реальном сайте и вам не нужно делать немедленные обновления БД, вам будет намного лучше выбрать очень консервативное решение, а не дополнительные библиотеки слоев.

Создайте массив фиксированного размера (приблизительный размер - скажем, 1000 элементов или N секунд запросов) и блокируемый индекс, чтобы запросы просто помещали данные в слоты и возвращались. Когда один блок заполняется (продолжайте проверять подсчет), создайте еще один и создайте асинхронный делегат для обработки и отправки в SQL только что заполненного блока. В зависимости от структуры ваших данных, этот делегат может упаковать все данные в массивы, разделенные запятыми, может быть, даже простой XML (конечно, нужно проверить производительность этого) и отправить их в SQL sproc, который должен лучше всего обработать их запись по записи - никогда не держал большой замок. Если станет тяжело, вы можете разделить свой блок на несколько более мелких блоков. Главное, что вы минимизировали количество запросов к SQL, всегда сохраняли одну степень разделения и даже не платили цену за пул потоков — вам, вероятно, вообще не нужно будет использовать больше двух асинхронных потоков. .

Это будет намного быстрее, чем возиться с Parallel-ами.

person ZXX    schedule 27.08.2010
comment
Я надеялся, что очередь будет очищаться по мере ее заполнения, поэтому, если я получу сотни элементов в одном вызове веб-службы, а затем они отправят еще одну большую партию, она просто будет продолжать выполняться, пока внутри что-то есть. В данный момент я сплю 10 секунд, обрабатываю очередь, потом сплю еще 10 секунд. - person James Black; 28.08.2010