Как использовать IObservable/IObserver с ConcurrentQueue или ConcurrentStack

Я понял, что когда я пытаюсь обрабатывать элементы в параллельной очереди, используя несколько потоков, в то время как несколько потоков могут помещать в нее элементы, идеальным решением было бы использование реактивных расширений с параллельными структурами данных.

Мой оригинальный вопрос находится по адресу:

При использовании ConcurrentQueue попытка удаления из очереди во время цикла параллельно

Поэтому мне любопытно, есть ли способ получить запрос LINQ (или PLINQ), который будет постоянно удаляться из очереди по мере помещения в него элементов.

Я пытаюсь заставить это работать таким образом, чтобы у меня было n количество производителей, помещающих в очередь, и ограниченное количество потоков для обработки, поэтому я не перегружаю базу данных.

Если бы я мог использовать фреймворк Rx, я ожидаю, что смогу просто запустить его, и если 100 элементов будут помещены в течение 100 мс, то 20 потоков, являющихся частью запроса PLINQ, будут просто обрабатываться через очередь.

Я пытаюсь работать вместе с тремя технологиями:

  1. Rx Framework (реактивный LINQ)
  2. ПЛИНГ
  3. System.Collections.Concurrent структуры

person James Black    schedule 13.06.2010    source источник
comment
Можете ли вы уточнить, как вы ожидали, что Rx поможет вам здесь?   -  person Richard Szalay    schedule 10.11.2010
comment
@Richard Szalay - Как я уже упоминал в конце, я думаю, что мне не нужно опрашивать, чтобы увидеть, есть ли что-то в очереди, я мог бы просто реагировать, когда что-то помещается туда, поэтому, если большое количество элементов внезапно вставленный, я мог бы иметь несколько потоков, выполняющих обработку. Я пытаюсь избежать голосования, что я и делаю прямо сейчас.   -  person James Black    schedule 10.11.2010


Ответы (2)


Дрю прав, я думаю, что ConcurrentQueue, хотя это звучит идеально для работы, на самом деле является базовой структурой данных, которую использует BlockingCollection. Мне тоже кажется очень задним числом. Ознакомьтесь с главой 7 этой книги* 1?ie=UTF8&qid=1294319704&sr=8 -1" rel="nofollow">http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr

class Program
{
    private static ManualResetEvent _mre = new ManualResetEvent(false);
    static void Main(string[] args)
    {
        var theQueue = new BlockingCollection<string>();
        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));


        LoadQueue(theQueue, "Producer A");
        LoadQueue(theQueue, "Producer B");
        LoadQueue(theQueue, "Producer C");

        _mre.Set();

        Console.WriteLine("Processing now....");

        Console.ReadLine();
    }

    private static void ProcessNewValue(string value, string consumerName, int delay)
    {
        Thread.SpinWait(delay);
        Console.WriteLine("{1} consuming {0}", value, consumerName);
    }

    private static void LoadQueue(BlockingCollection<string> target, string prefix)
    {
        var thread = new Thread(() =>
                                    {
                                        _mre.WaitOne();
                                        for (int i = 0; i < 100; i++)
                                        {
                                            target.Add(string.Format("{0} {1}", prefix, i));
                                        }
                                    });
        thread.Start();
    }
}
1?ie=UTF8&qid=1294319704&sr=8-1, и он объяснит, как использовать BlockingCollection и иметь несколько производителей и несколько потребителей, каждый из которых снимает «очередь». Вы захотите взглянуть на метод «GetConsumingEnumerable()» и, возможно, просто вызвать для него .ToObservable().

*В остальном книга довольно посредственная.

редактировать:

Вот пример программы, которая, я думаю, делает то, что вы хотите?

class Program
{
    private static ManualResetEvent _mre = new ManualResetEvent(false);
    static void Main(string[] args)
    {
        var theQueue = new BlockingCollection<string>();
        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));


        LoadQueue(theQueue, "Producer A");
        LoadQueue(theQueue, "Producer B");
        LoadQueue(theQueue, "Producer C");

        _mre.Set();

        Console.WriteLine("Processing now....");

        Console.ReadLine();
    }

    private static void ProcessNewValue(string value, string consumerName, int delay)
    {
        Thread.SpinWait(delay);
        Console.WriteLine("{1} consuming {0}", value, consumerName);
    }

    private static void LoadQueue(BlockingCollection<string> target, string prefix)
    {
        var thread = new Thread(() =>
                                    {
                                        _mre.WaitOne();
                                        for (int i = 0; i < 100; i++)
                                        {
                                            target.Add(string.Format("{0} {1}", prefix, i));
                                        }
                                    });
        thread.Start();
    }
}
person Lee Campbell    schedule 06.01.2011

Я не знаю, как лучше всего это сделать с помощью Rx, но я бы рекомендовал просто использовать BlockingCollection<T> и шаблон производитель-потребитель. Ваш основной поток добавляет элементы в коллекцию, которая по умолчанию использует ConcurrentQueue<T> внизу. Затем у вас есть отдельный Task, который вы запускаете перед тем, который использует Parallel::ForEach вместо BlockingCollection<T> для одновременной обработки такого количества элементов из коллекции, которое имеет смысл для системы. Теперь вы, вероятно, также захотите изучить использование метода GetConsumingPartitioner библиотеки ParallelExtensions, чтобы быть наиболее эффективным, поскольку разделитель по умолчанию создаст больше накладных расходов, чем вы хотите в этом случае. Подробнее об этом можно прочитать в этой записи блога.

Когда основной поток завершен, вы вызываете CompleteAdding для BlockingCollection<T> и Task::Wait на Task, который вы развернули, чтобы дождаться, пока все потребители закончат обработку всех предметы в коллекции.

person Drew Marsh    schedule 01.12.2010
comment
Основная загвоздка в использовании BlockingCollection заключается в том, что потребляющий поток блокируется. Паттерн Observable будет запускать поток только тогда, когда есть что обрабатывать. - person Christopher Stevenson; 21.03.2014