boost :: asio асинхронное условие

Идея состоит в том, чтобы иметь возможность заменить многопоточный код на boost :: asio и пул потоков в проблеме потребителя / производителя. В настоящее время каждый потребительский поток ожидает boost::condition_variable - когда производитель что-то добавляет в очередь, он вызывает _2 _ / _ 3_, чтобы уведомить всех потребителей. Что происходит, когда у вас (потенциально) более 1000 потребителей? Нити не масштабируются!

Я решил использовать boost::asio, но потом обнаружил, что в нем нет переменных состояния. И тогда родился async_condition_variable:

class async_condition_variable
{
private:
    boost::asio::io_service& service_;
    typedef boost::function<void ()> async_handler;
    std::queue<async_handler> waiters_;

public:
    async_condition_variable(boost::asio::io_service& service) : service_(service)
    {
    }

    void async_wait(async_handler handler)
    {
        waiters_.push(handler);
    }

    void notify_one()
    {
        service_.post(waiters_.front());
        waiters_.pop();
    }

    void notify_all()
    {
        while (!waiters_.empty()) {
            notify_one();
        }
    }
};

По сути, каждый потребитель будет звонить async_condition_variable::wait(...). Затем продюсер в конце концов позвонил бы async_condition_variable::notify_one() или async_condition_variable::notify_all(). Будет вызван дескриптор каждого потребителя, который либо будет действовать в соответствии с условием, либо снова вызовет async_condition_variable::wait(...). Возможно ли это, или я здесь сумасшедший? Какую блокировку (мьютексы) следует выполнять, учитывая тот факт, что она будет выполняться в пуле потоков?

P.S .: Да, это скорее RFC (запрос комментариев), чем вопрос :).


person bruno nery    schedule 09.07.2012    source источник
comment
Какой вариант использования? Вам нужно делать 1000 разных вещей, когда что-то происходит? Если так, то это совершенно неправильный путь. (Пул потоков должен пережевывать те 1000 заданий, которые теперь необходимо выполнить.)   -  person David Schwartz    schedule 10.07.2012
comment
У меня есть (потенциально большой) набор клиентов, которым необходимо иметь возможность получать задачи с центрального сервера. Большая часть этих клиентов будет находиться за брандмауэрами, поэтому идея состоит в том, чтобы соединения клиент-сервер всегда оставались открытыми. Эта «переменная асинхронного условия» необходима серверу для отправки задачи данному клиенту, когда внешний контроллер отправляет новую задачу.   -  person bruno nery    schedule 11.07.2012
comment
У меня такая же проблема. представьте, что вы реализуете сервер для APNS ... в APNS вы не получите ответа на свое отправленное сообщение, если нет ошибки. Также вы хотите, чтобы сокет оставался открытым, чтобы отправлять через него больше сообщений. Итак, у вас есть этот пул соединений, и им нужно получить следующее сообщение для отправки из пула. Естественно, для этого вы используете condition_variable. Но ожидание переменной условия приостановит ваш поток подключения, и поэтому вы не получите async_read для работы, если вы не используете ожидание с таймаутом, чтобы создать ситуацию почти занятого вращения, которая отстой.   -  person Alex Kremer    schedule 19.07.2013
comment
и вот мой результат (отлично работает): github.com/godexsoft/push_service/blob/master/src/push/detail/   -  person Alex Kremer    schedule 22.07.2013
comment
boost :: asio :: deadline_timer может делать то же самое. stackoverflow.com/questions/17005258/   -  person akakatak    schedule 13.09.2016


Ответы (3)


Составьте список того, что нужно сделать, когда произойдет какое-либо событие. Имейте функцию для добавления чего-либо в этот список и функцию для удаления чего-либо из этого списка. Затем, когда событие происходит, пусть пул потоков работает над списком заданий, которые теперь необходимо выполнить. Вам не нужны потоки, специально ожидающие события.

person David Schwartz    schedule 10.07.2012
comment
А как вызвать функцию, удаляющую что-то из списка? Обратите внимание, что я использую boost :: asio с пулом потоков, но количество потоков в пуле также может быть равно единице. - person bruno nery; 11.07.2012
comment
@brunonery: я не совсем уверен, что вы спрашиваете, но функция будет функцией-членом класса, который обрабатывает событие. Он получит блокировку, которая защищает список, а затем удалит элемент из списка. (Список может быть картой или массивом, в зависимости от того, как вы хотите это сделать. Когда вы добавляете запись в список, вы можете вернуть дескриптор, который будет использоваться для ее удаления. Есть и другие способы.) В качестве альтернативы, список может быть списком слабых указателей, поэтому достаточно просто уничтожить объект уведомления. (В каждом уведомлении удалите все мертвые указатели.) - person David Schwartz; 11.07.2012

Boost :: asio может быть сложно осмыслить. По крайней мере, мне трудно это делать.

Вам не нужно заставлять потоки ничего ждать. Они делают это сами, когда им нечего делать. Примеры, которые выглядели так, как вы хотели, были опубликованы в io_service для каждого элемента.

Следующий код был вдохновлен этой ссылкой. Это фактически открывает мне глаза на то, как вы могли бы использовать его для множества вещей.

Я уверен, что это не идеально, но я думаю, что это дает общее представление. Надеюсь, это поможет.

Код

#include <iostream>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
class ServerProcessor
{
protected:
    void handleWork1(WorkObject1* work)
    {
        //The code to do task 1 goes in here
    }
    void handleWork2(WorkObject2* work)
    {
        //The code to do task 2 goes in here
    }

    boost::thread_group worker_threads_;

    boost::asio::io_service io_service_;
    //This is used to keep io_service from running out of work and exiting to soon.
    boost::shared_ptr<boost::asio::io_service::work> work_;


public:
    void start(int numberOfThreads)
    {
        boost::shared_ptr<boost::asio::io_service::work> myWork(new boost::asio::io_service::work(io_service_));
        work_=myWork;

        for (int x=0; x < numberOfThreads; ++x)
            worker_threads_.create_thread( boost::bind( &ServerProcessor::threadAction, this ) );

    }

    void doWork1(WorkObject1* work)
    {
        io_service_.post(boost::bind(&ServerProcessor::handleWork1, this, work));
    }

    void doWork2(WorkObject2* work)
    {
        io_service_.post(boost::bind(&ServerProcessor::handleWork2, this, work));
    }


    void threadAction()
    {
        io_service_.run();
    }

    void stop()
    {
        work_.reset();
        io_service_.stop();
        worker_threads_.join_all();
    }

};

int main()
{
    ServerProcessor s;

    std::string input;
    std::cout<<"Press f to stop"<<std::endl;

    s.start(8);

    std::cin>>input;

    s.stop();

    return 0;
}
person Thomas Lann    schedule 19.11.2012
comment
Вы понимаете, что отвечаете на вопрос (оставшийся без ответа) четыре месяца назад, верно? И что у меня есть код, но просил прокомментировать выполнимость? В любом случае, спасибо за попытку :) - person bruno nery; 20.11.2012
comment
@brunonery Я действительно понял, что вопрос был задолго до этого. Но принятого ответа не было. Итак, я пытался помочь. Что касается кода, он сильно отличался от того, как я видел, как работают потребители asio. Итак, я попытался привести пример того, как это сделать с помощью того, что предоставляется. На самом деле вам не нужны переменные условия, чтобы делать то, что вы пытаетесь сделать. Я не думаю, что asio был разработан для такого использования. - person Thomas Lann; 20.11.2012

Как насчет использования boost :: signal2?

Это потокобезопасный побочный продукт boost :: signal, который позволяет вашим клиентам подписывать обратный вызов на отправляемый сигнал.

Затем, когда сигнал передается асинхронно в отправляемом задании io_service, все зарегистрированные обратные вызовы будут выполнены (в том же потоке, который испустил сигнал).

person Franco Miceli    schedule 13.10.2016