Я твердо верю в то, что нужно учиться заново. С таким настроением я решил реализовать собственный пул потоков. Задача, которую я ставил перед собой, была следующая:
- Чтобы иметь возможность ставить рабочие элементы в очередь в пуле потоков.
- Чтобы иметь возможность обрабатывать рабочие элементы с фиксированным количеством потоков - все они создаются одновременно.
- Функция общего рабочего потока должна знать только, как снимать очередь, и не должна иметь дело с другими функциями / свойствами, такими как IsEmpty или Count.
Мне удалось достичь вышеупомянутых целей, но я хочу проверить подход, который я использовал с экспертами по stackoverflow. Также хотелось бы узнать, есть ли подходы лучше или как эксперт по многопоточности решил бы эту проблему. В следующих абзацах упоминается о проблеме, с которой я столкнулся, и о том, как я ее решил.
Пул потоков, который я создал, внутренне поддерживал очередь рабочих элементов, из которой все рабочие потоки выбирали элемент, а затем обрабатывали его. Каждый раз, когда новый элемент ставится в очередь, он сигнализирует о событии, чтобы любой свободный поток мог его подобрать и выполнить.
Я начал с autoresetevent, чтобы сигнализировать ожидающим потокам о любых новых рабочих элементах в очереди, но я столкнулся с проблемой потери сигнальных событий. Это происходит, когда в очередь помещается более одного элемента и нет свободных потоков для обработки элемента. Общее количество элементов, которые остаются необработанными, такое же, как общее количество сигнальных событий, которые потеряны из-за перекрывающихся наборов (сигнальных) событий.
Чтобы решить проблему потери сигнальных событий, я создал оболочку поверх autoresetevent и использовал ее вместо autoresetevent. Исправлена проблема. Вот список того же кода:
public static class CustomThreadPool
{
static CustomThreadPool()
{
for (int i = 0; i < minThreads; i++)
_threads.Add(
new Thread(ThreadFunc) { IsBackground = true }
);
_threads.ForEach((t) => t.Start());
}
public static void EnqueWork(Action action)
{
_concurrentQueue.Enqueue(action);
_enqueEvent.Set();
}
private static void ThreadFunc()
{
Action action = null;
while (true)
{
_enqueEvent.WaitOne();
_concurrentQueue.TryDequeue(out action);
action();
}
}
private static ConcurrentQueue<Action> _concurrentQueue = new ConcurrentQueue<Action>();
private static List<Thread> _threads = new List<Thread>();
private static CountAutoResentEvent _enqueEvent = new CountAutoResentEvent();
private static object _syncObject = new object();
private const int minThreads = 4;
private const int maxThreads = 10;
public static void Test()
{
CustomThreadPool.EnqueWork(() => {
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****First*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Second*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Third*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Fourth*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Fifth*****");
});
}
}
public class CountAutoResentEvent
{
public void Set()
{
_event.Set();
lock (_sync)
_countOfSet++;
}
public void WaitOne()
{
_event.WaitOne();
lock (_sync)
{
_countOfSet--;
if (_countOfSet > 0)
_event.Set();
}
}
private AutoResetEvent _event = new AutoResetEvent(false);
private int _countOfSet = 0;
private object _sync = new object();
}
Теперь у меня несколько вопросов:
- Является ли мой подход полным доказательством?
- Какой механизм синхронизации лучше всего подходит для этой проблемы и почему?
- Как специалист по многопоточности справится с этой проблемой?
Спасибо.