Реализация condition_variable для решения многопоточного ожидания занятости

Моя программа выводит на консоль несколько строк текста, используя незанятые рабочие потоки. Проблема, однако, заключается в том, что рабочие процессы не ждут завершения предыдущих рабочих потоков, прежде чем напечатать текст, в результате чего текст вставляется в текст другого рабочего потока, как показано на рисунке ниже:

введите здесь описание изображения

Мне нужно решить эту проблему, известную как проблема ожидания, с помощью std::condition_variable. Я попытался реализовать condition_variable в приведенном ниже коде на основе примера, найденного в этом link, и следующий вопрос о стеке с переполнением помог мне, но недостаточно из-за моих ограниченных знаний о C++ в целом. Так что, в конце концов, я только прокомментировал все обратно, и теперь я в недоумении.

// threadpool.cpp
// Compile with:
// g++ -std=c++11 -pthread threadpool.cpp -o threadpool

#include <thread>
#include <mutex>
#include <iostream>
#include <vector>
#include <deque>

class ThreadPool; // forward declare
//std::condition_variable cv;
//bool ready = false;
//bool processed = false;

class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool;
};

class ThreadPool {
public:
    ThreadPool(size_t threads);
    template<class F> void enqueue(F f);
    ~ThreadPool();
private:
    friend class Worker;

    std::vector<std::thread> workers;
    std::deque<std::function<void()>> tasks;

    std::mutex queue_mutex;
    bool stop;
};

void Worker::operator()()
{
    std::function<void()> task;
    while (true)
    {
        std::unique_lock<std::mutex> locker(pool.queue_mutex);
        //cv.wait(locker, [] {return ready; });

        if (pool.stop) return;
        if (!pool.tasks.empty())
        {
            task = pool.tasks.front();
            pool.tasks.pop_front();
            locker.unlock();
            //cv.notify_one();
            //processed = true;
            task();
        }
        else {
            locker.unlock();
            //cv.notify_one();
        }
    }
}

ThreadPool::ThreadPool(size_t threads) : stop(false)
{
    for (size_t i = 0; i < threads; ++i)
        workers.push_back(std::thread(Worker(*this)));
}

ThreadPool::~ThreadPool()
{
    stop = true; // stop all threads

    for (auto &thread : workers)
        thread.join();
}

template<class F>
void ThreadPool::enqueue(F f)
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    //cv.wait(lock, [] { return processed; });
    tasks.push_back(std::function<void()>(f));
    //ready = true;
}

int main()
{
    ThreadPool pool(4);

    for (int i = 0; i < 8; ++i) pool.enqueue([i]() { std::cout << "Text printed by worker " << i << std::endl; });

    std::cin.ignore();
    return 0;
}

person user3776022    schedule 02.02.2017    source источник
comment
stdout это shared resource способ не блокировать перед доступом к нему?   -  person Shmuel H.    schedule 02.02.2017
comment
Вы предлагаете избавиться от condition_variable и решить проблему, просто заблокировав стандартный вывод с помощью мьютекса? Потому что если да, то я бы хотел, но вынужден решать это с помощью condition_variable.   -  person user3776022    schedule 02.02.2017
comment
Проблема немного плохо построена, так как пул потоков, который требует, чтобы все его рабочие ждали друг друга, а не выполнялись последовательно, на самом деле просто очень сложный процедурный код. Было бы больше смысла, если бы они выполняли некоторые вычисления параллельно, а затем синхронизировались бы только при печати, вместо того, чтобы вся задача состояла исключительно из печати строки. Тем не менее, это можно сделать.   -  person Nicolas Holthaus    schedule 02.02.2017
comment
Я надеюсь, что это может быть, поскольку я в полной растерянности, как это сделать. Я бы просто использовал мьютекс, а также реструктурировал бы код, но в этом случае я должен сохранить большую часть структуры.   -  person user3776022    schedule 02.02.2017
comment
@ user3776022 Я знаю, комментарий отделен от ответа, который я составляю. Я просто хотел, чтобы вы знали, что, поскольку вопрос надуман, ответ тоже будет. Я не вижу в этом пример хорошей практики использования condition_variable. На самом деле, я принципиально отказываюсь помогать с заданиями, но это настолько глупо, что я нарушаю свое правило.   -  person Nicolas Holthaus    schedule 02.02.2017
comment
Эта проблема — обычное состояние гонки. Решение с помощью условных переменных возможно, но надумано и не нужно. Почему вы не можете просто использовать мьютекс там, где проблема, то есть вокруг печати операторов?   -  person n. 1.8e9-where's-my-share m.    schedule 02.02.2017
comment
@ user3776022 также спасибо за то, что вы новый пользователь и предоставили компиляцию MCVE. Без него вы бы не получили ответа.   -  person Nicolas Holthaus    schedule 02.02.2017


Ответы (2)


Вот рабочий образец:

// threadpool.cpp
// Compile with:
// g++ -std=c++11 -pthread threadpool.cpp -o threadpool

#include <thread>
#include <mutex>
#include <iostream>
#include <vector>
#include <deque>
#include <atomic>

class ThreadPool; 

// forward declare
std::condition_variable ready_cv;
std::condition_variable processed_cv;
std::atomic<bool> ready(false);
std::atomic<bool> processed(false);

class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool;
};

class ThreadPool {
public:
    ThreadPool(size_t threads);
    template<class F> void enqueue(F f);
    ~ThreadPool();
private:
    friend class Worker;

    std::vector<std::thread> workers;
    std::deque<std::function<void()>> tasks;

    std::mutex queue_mutex;
    bool stop;
};

void Worker::operator()()
{
    std::function<void()> task;

    // in real life you need a variable here like while(!quitProgram) or your
    // program will never return. Similarly, in real life always use `wait_for`
    // instead of `wait` so that periodically you check to see if you should
    // exit the program
    while (true)
    {
        std::unique_lock<std::mutex> locker(pool.queue_mutex);
        ready_cv.wait(locker, [] {return ready.load(); });

        if (pool.stop) return;
        if (!pool.tasks.empty())
        {
            task = pool.tasks.front();
            pool.tasks.pop_front();
            locker.unlock();
            task();
            processed = true;
            processed_cv.notify_one();
        }
    }
}

ThreadPool::ThreadPool(size_t threads) : stop(false)
{
    for (size_t i = 0; i < threads; ++i)
        workers.push_back(std::thread(Worker(*this)));
}

ThreadPool::~ThreadPool()
{
    stop = true; // stop all threads

    for (auto &thread : workers)
        thread.join();
}

template<class F>
void ThreadPool::enqueue(F f)
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    tasks.push_back(std::function<void()>(f));
    processed = false;
    ready = true;
    ready_cv.notify_one();
    processed_cv.wait(lock, [] { return processed.load(); });
}

int main()
{
    ThreadPool pool(4);

    for (int i = 0; i < 8; ++i) pool.enqueue([i]() { std::cout << "Text printed by worker " << i << std::endl; });

    std::cin.ignore();
    return 0;
}

Выход:

Text printed by worker 0 
Text printed by worker 1 
Text printed by worker 2 
Text printed by worker 3 
Text printed by worker 4 
Text printed by worker 5 
Text printed by worker 6 
Text printed by worker 7

Почему бы не сделать это в рабочем коде

Так как назначение состоит в том, чтобы печатать строки по порядку, этот код нельзя на самом деле распараллелить, поэтому мы придумали способ заставить его работать полностью последовательно, используя необходимые Золотой молот из std::condition_variable. Но, по крайней мере, мы избавились от этого чертова занятого ожидания!

В реальном примере вы хотели бы обрабатывать данные или выполнять задачи параллельно и синхронизировать только вывод, и эта структура по-прежнему не является правильным подходом к этому, если бы вы делали это из царапать.

Что я изменил и почему

Я использовал атомарные логические значения для условий, потому что они имеют детерминированное поведение при совместном использовании несколькими потоками. Не обязательно во всех случаях, но, тем не менее, хорошая практика.

Вы должны включить условие выхода в цикл while(true) (например, флаг, установленный обработчиком SIGINT или чем-то еще), иначе ваша программа никогда не завершится. Это просто задание, поэтому мы его пропустили, но этим очень важно не пренебрегать в производственном коде.

Возможно, присваивание можно решить с помощью одной условной переменной, но я не уверен в этом, и в любом случае лучше использовать две, потому что намного понятнее и понятнее, что делает каждая из них. По сути, мы ждем задачи, затем просим поставленную в очередь подождать, пока она не будет выполнена, а затем сообщаем, что она фактически обработана, и мы готовы к следующей. Изначально вы были на правильном пути, но я думаю, что с двумя резюме более очевидно, что пошло не так.

Кроме того, важно установить переменные условия (ready и processed) перед использованием notify().

Я удалил locker.unlock() за ненадобностью. C++ стандартные блокировки представляют собой структуры RAII, поэтому блокировка будет разблокирована, когда выйдет за пределы области действия, что в основном является следующей строкой. В общем, лучше избегать бессмысленных ветвлений, поскольку вы делаете свою программу ненужной с сохранением состояния.

Педагогический бред...

Теперь, когда проблема рассмотрена и решена, у меня есть несколько вещей, которые, как мне кажется, нужно сказать о задании в целом и которые, я думаю, будут более важными для вашего обучения, чем решение проблемы, как указано.

Если вы были сбиты с толку или разочарованы заданием, хорошо, так и должно быть. Понятно, что вам трудно вставить квадратный колышек в круглое отверстие, и я думаю, что реальная ценность в этой проблеме заключается в том, чтобы научиться определять, когда вы используете правильный инструмент для правильной работы, а когда нет. .

Переменные условия являются подходящим инструментом для решения проблемы с циклом занятости, однако это назначение (как указал @n.m.) является простым условием гонки. Тем не менее, это всего лишь простое состояние гонки, потому что оно включает в себя ненужный и плохо реализованный пул потоков, что делает проблему сложной и трудной для понимания без всякой цели. И, тем не менее, std::async в любом случае следует предпочесть ручным пулам потоков в современном С++ (это и проще реализовать правильно, и более производительно на многих платформах, и не требует кучи глобальных и синглтонов и исключительно выделенных ресурсов) .

Если бы это было задание от вашего босса, а не от профессора, вы бы сдали следующее:

for(int i = 0; i < 8; ++i)
{
    std::cout << "Text printed by worker " << i << std::endl;
}

Эта проблема решается (оптимально) простым циклом for. Проблемы с ожиданием/блокировкой являются результатом ужасного дизайна, и "правильный" поступок - исправить дизайн, а не перевязывать его. Я даже не думаю, что задание является поучительным, потому что нет никакого возможного способа или причины для распараллеливания вывода, поэтому это просто сбивает с толку всех, включая сообщество SO. Похоже на отрицательное обучение, когда потоки просто вносят ненужную сложность, не улучшая вычислений.

На самом деле трудно сказать, хорошо ли сам профессор понимает концепции многопоточности и условных переменных, исходя из структуры задания. Задания по необходимости приходится сокращать, упрощать и несколько упрощать в учебных целях, но на самом деле это противоположно тому, что было сделано здесь, когда из простой задачи делалась сложная.

Как правило, я никогда не отвечаю на вопросы, связанные с домашним заданием по SO, потому что я думаю, что раздача ответов мешает обучению, и что самый ценный навык разработчика — это научиться биться головой о стену, пока в него не войдет идея. Однако от надуманных заданий, подобных этому, нельзя получить ничего, кроме отрицательного обучения, и хотя в школе приходится играть по правилам профессора, важно научиться распознавать надуманные проблемы, когда вы их видите, анализировать их и приходить к решению. простое и правильное решение.

person Nicolas Holthaus    schedule 02.02.2017
comment
Это задание используется для обучения новичков C++11. Как вы думаете, я должен сообщить учителю об улучшении исходного кода? Причина, по которой я спрашиваю, в том, что все говорили мне, что это плохой способ сделать это по-моему, и все они указывали на код, предоставленный учителем. Мне казалось, что большинство заданий были излишне сложными, и я даже переписал несколько с нуля. - person user3776022; 02.02.2017
comment
@ user3776022 забавно, что вы должны спросить, я собирался добавить раздел «педагогические разглагольствования», где подробно описываю, что не так с заданием, по моим собственным катарсическим причинам. Но я бы сказал нет, если только вы не думаете, что он из тех, кто действительно открыт для критики, или в лучшем случае укажете ему на критику ТАК. Главное, вы научитесь мыслить критически и использовать правильный инструмент для правильной работы, а не принимать близко к сердцу негативные тренировки, подобные этому. Однако нет смысла подвергать опасности свои оценки или репутацию, споря из-за глупых заданий. - person Nicolas Holthaus; 02.02.2017

Я думаю, что это нормально, так как мьютекс не блокируется перед печатью. Для каждого хода в цикле нет гарантии, что i будет напечатано раньше, чем i+1.

Для хорошего приоритета печати вы должны отображать сообщения после блокировки мьютекса функции enqueue.

person ben soft    schedule 02.02.2017
comment
К сожалению, это задание с конкретным условием, которое я реализую для condition_variable. Я уже исправил предыдущее задание с помощью мьютекса, поэтому я не могу сдать два задания с одним и тем же решением и ожидать, что оно пройдет. - person user3776022; 02.02.2017