Изучение реализации пула потоков - сигнальные события теряются при использовании autoresetevent

Я твердо верю в то, что нужно учиться заново. С таким настроением я решил реализовать собственный пул потоков. Задача, которую я ставил перед собой, была следующая:

  1. Чтобы иметь возможность ставить рабочие элементы в очередь в пуле потоков.
  2. Чтобы иметь возможность обрабатывать рабочие элементы с фиксированным количеством потоков - все они создаются одновременно.
  3. Функция общего рабочего потока должна знать только, как снимать очередь, и не должна иметь дело с другими функциями / свойствами, такими как IsEmpty или Count.

Мне удалось достичь вышеупомянутых целей, но я хочу проверить подход, который я использовал с экспертами по stackoverflow. Также хотелось бы узнать, есть ли подходы лучше или как эксперт по многопоточности решил бы эту проблему. В следующих абзацах упоминается о проблеме, с которой я столкнулся, и о том, как я ее решил.

Пул потоков, который я создал, внутренне поддерживал очередь рабочих элементов, из которой все рабочие потоки выбирали элемент, а затем обрабатывали его. Каждый раз, когда новый элемент ставится в очередь, он сигнализирует о событии, чтобы любой свободный поток мог его подобрать и выполнить.

Я начал с autoresetevent, чтобы сигнализировать ожидающим потокам о любых новых рабочих элементах в очереди, но я столкнулся с проблемой потери сигнальных событий. Это происходит, когда в очередь помещается более одного элемента и нет свободных потоков для обработки элемента. Общее количество элементов, которые остаются необработанными, такое же, как общее количество сигнальных событий, которые потеряны из-за перекрывающихся наборов (сигнальных) событий.

Чтобы решить проблему потери сигнальных событий, я создал оболочку поверх autoresetevent и использовал ее вместо autoresetevent. Исправлена ​​проблема. Вот список того же кода:

public static class CustomThreadPool
{
    static CustomThreadPool()
    {
        for (int i = 0; i < minThreads; i++)
            _threads.Add(
                new Thread(ThreadFunc) { IsBackground = true }
                );

        _threads.ForEach((t) => t.Start());
    }

    public static void EnqueWork(Action action)
    {
        _concurrentQueue.Enqueue(action);
        _enqueEvent.Set();
    }

    private static void ThreadFunc()
    {
        Action action = null;
        while (true)
        {
            _enqueEvent.WaitOne();
            _concurrentQueue.TryDequeue(out action);
            action();
        }
    }

    private static ConcurrentQueue<Action> _concurrentQueue = new ConcurrentQueue<Action>();
    private static List<Thread> _threads = new List<Thread>();
    private static CountAutoResentEvent _enqueEvent = new CountAutoResentEvent();
    private static object _syncObject = new object();
    private const int minThreads = 4;
    private const int maxThreads = 10;

    public static void Test()
    {
        CustomThreadPool.EnqueWork(() => {

            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****First*****");
        });

        CustomThreadPool.EnqueWork(() =>
        {
            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****Second*****");
        });

        CustomThreadPool.EnqueWork(() =>
        {
            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****Third*****");
        });

        CustomThreadPool.EnqueWork(() =>
        {
            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****Fourth*****");
        });

        CustomThreadPool.EnqueWork(() =>
        {
            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****Fifth*****");
        });
    }
}

public class CountAutoResentEvent
{
    public void Set()
    {
        _event.Set();
        lock (_sync)
            _countOfSet++;
    }

    public void WaitOne()
    {
        _event.WaitOne();
        lock (_sync)
        {
            _countOfSet--;
            if (_countOfSet > 0)
                _event.Set();
        }
    }

    private AutoResetEvent _event = new AutoResetEvent(false);
    private int _countOfSet = 0;
    private object _sync = new object();
}

Теперь у меня несколько вопросов:

  1. Является ли мой подход полным доказательством?
  2. Какой механизм синхронизации лучше всего подходит для этой проблемы и почему?
  3. Как специалист по многопоточности справится с этой проблемой?

Спасибо.


person Anand Patel    schedule 17.12.2011    source источник
comment
@Hans Passant: На мой взгляд, это говорит о другом вопросе. И в этом вопросе несколько моментов.   -  person Tudor    schedule 17.12.2011


Ответы (1)


Судя по тому, что я видел, я бы сказал, что это правильно. Мне нравится, что вы использовали ConcurrentQueue и не реализовали собственную синхронизированную очередь. Это беспорядок и, скорее всего, будет не так быстро, как существующий.

Я хотел бы только отметить, что ваш настраиваемый «сигнальный механизм» на самом деле очень похож на семафор: блокировку, которая позволяет нескольким потокам входить в критическую секцию. Эта функция уже существует в классе Semaphore.

person Tudor    schedule 17.12.2011
comment
@Hans Passant: Он не сказал, что это неправильно. Он спросил, правильно это или нет. А почему семафор не правильный? В основном то, что он реализовал, - это событие со счетчиком. - person Tudor; 17.12.2011
comment
Я мог бы решить эту проблему с помощью ManualResetEvent и Semaphore (как предложил Тудор). Вопрос в том, верен ли мой подход? Во-вторых, есть ли простой или правильный подход / механизм к проблеме. Как бы вы решили ту же проблему, если бы вам пришлось? - person Anand Patel; 17.12.2011
comment
@ Ананд Патель: Я бы решил это так же, как и ты. Это кажется наиболее простым подходом для простого пула потоков, и вы не делаете никаких сложных вещей, которые могут вызвать у вас проблемы. Как я уже сказал, я, вероятно, также попытался бы использовать встроенный механизм сигнализации, чтобы уменьшить количество потенциальных ошибок, но в противном случае я, вероятно, сделал бы это так же, как вы. - person Tudor; 17.12.2011
comment
Ну, я бы сразу взял семафор. Если есть потокобезопасный счетчик (а здесь он есть), это почти определение семафора. Запрашивающая сторона нажимает на рабочий элемент и сигнализирует семафору, потоки пула ожидают семафора, а затем удаляют рабочий элемент из очереди - нет необходимости проверять какие-либо очереди «count» или «isEmpty». Пул потоков - это, по сути, очередь «производитель-потребитель» для рабочих объектов, очередь P-C «Computer Science 101» использует семафоры, так зачем вообще рассматривать для этого использование события? - person Martin James; 17.12.2011
comment
@Martin James - Я знаю семафор, но я пытался решить проблему с помощью только - Monitor, ManualResetEvent и AutoResetEvent. Я не понимал, что Semaphore лучше всего подходит для решения. Как было предложено вами и Тюдором, я попробовал семафор, и он работал абсолютно нормально без каких-либо настроек. Семафор кажется подходящим механизмом в этом случае. - person Anand Patel; 18.12.2011
comment
@Tudor: Отметьте ваш ответ как ответ. - person Anand Patel; 18.12.2011
comment
@ Ананд Патель: Я был рад вам помочь. :) - person Tudor; 18.12.2011