Boost Beast Async Server Failing with Assertion failed: (id_! = T :: id) при нескольких вызовах aync

Ошибка утверждения: (id_! = T :: id), функция try_lock, файл /usr/local/include/boost/beast/websocket/detail/stream_base.hpp, строка 91.

       // Echoes back all received WebSocket messages
class session : public std::enable_shared_from_this<session>
{
    websocket::stream<tcp::socket> ws_;
    boost::asio::strand<
            boost::asio::io_context::executor_type> strand_;
    boost::beast::multi_buffer buffer_;

public:
    // Take ownership of the socket
    explicit
    session(tcp::socket socket)
            : ws_(std::move(socket))
            , strand_(ws_.get_executor())
    {
    }

    // Start the asynchronous operation
    void
    run()
    {
        // Accept the websocket handshake
        ws_.async_accept(
                boost::asio::bind_executor(
                        strand_,
                        std::bind(
                                &session::on_accept,
                                shared_from_this(),
                                std::placeholders::_1)));
    }

    void
    on_accept(boost::system::error_code ec)
    {
        std::cout << std::this_thread::get_id() <<" : DO ACCEPT" << std::endl;

        if(ec)
            return fail(ec, "accept");

        std::ifstream ifs("tweetsample.txt");
        if(ifs.good())
        {
            auto tweet = std::string{};
            while(std::getline(ifs,tweet))
            {
                try
                {
                    std::this_thread::sleep_for(std::chrono::seconds(1));
                    auto t = json::parse(tweet);
                    std::string tweet_text = t["text"];
                    auto n = boost::asio::buffer_copy(buffer_.prepare(tweet_text.size()), 
boost::asio::buffer(tweet_text));
                buffer_.commit(n);
                    do_write();
                }
                catch(nlohmann::detail::type_error& ex)
                {
                       std::cout << ex.what() << std::endl;
                }
            }
        }
    }

    void
    do_read()
    {
        // Read a message into our buffer
        ws_.async_read(
                buffer_,
                boost::asio::bind_executor(
                        strand_,
                        std::bind(
                                &session::on_read,
                                shared_from_this(),
                                std::placeholders::_1,
                                std::placeholders::_2)));
    }

    void do_write()
    {
        std::cout << std::this_thread::get_id() << "do_write" << std::endl;
        ws_.async_write(
                buffer_.data(),
                boost::asio::bind_executor(
                        strand_,
                        std::bind(
                                &session::on_write,
                                shared_from_this(),
                                std::placeholders::_1,
                                std::placeholders::_2)));
    }

    void
    on_read(
            boost::system::error_code ec,
            std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        // This indicates that the session was closed
        if(ec == websocket::error::closed)
            return;

        if(ec)
            fail(ec, "read");

        std::cout << boost::beast::buffers(buffer_.data()) << std::endl;

        // Echo the message
        ws_.text(ws_.got_text());
        ws_.async_write(
                buffer_.data(),
                boost::asio::bind_executor(
                        strand_,
                        std::bind(
                                &session::on_write,
                                shared_from_this(),
                                std::placeholders::_1,
                                std::placeholders::_2)));
    }

    void
    on_write(
            boost::system::error_code ec,
            std::size_t bytes_transferred)
    {
        std::cout << "on_write" << std::endl;
        boost::ignore_unused(bytes_transferred);

        if(ec)
            return fail(ec, "write");

        // Clear the buffer
        buffer_.consume(buffer_.size());

        // Do another read
        do_read();
    }
};

//------------------------------------------------------------------------------

// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener>
{
    tcp::acceptor acceptor_;
    tcp::socket socket_;

public:
    listener(
            boost::asio::io_context& ioc,
            tcp::endpoint endpoint)
            : acceptor_(ioc)
            , socket_(ioc)
    {
        boost::system::error_code ec;

        // Open the acceptor
        acceptor_.open(endpoint.protocol(), ec);
        if(ec)
        {
            fail(ec, "open");
            return;
        }

        // Allow address reuse
        acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec);
        if(ec)
        {
            fail(ec, "set_option");
            return;
        }

        // Bind to the server address
        acceptor_.bind(endpoint, ec);
        if(ec)
        {
            fail(ec, "bind");
            return;
        }

        // Start listening for connections
        acceptor_.listen(
                boost::asio::socket_base::max_listen_connections, ec);
        if(ec)
        {
            fail(ec, "listen");
            return;
        }
    }

    // Start accepting incoming connections
    void
    run()
    {
        if(! acceptor_.is_open())
            return;
        do_accept();
    }

    void
    do_accept()
    {

        acceptor_.async_accept(
                socket_,
                std::bind(
                        &listener::on_accept,
                        shared_from_this(),
                        std::placeholders::_1));
    }

    void
    on_accept(boost::system::error_code ec)
    {
        if(ec)
        {
            fail(ec, "accept");
        }
        else
        {
            // Create the session and run it
            std::make_shared<session>(std::move(socket_))->run();
        }

        // Accept another connection
        do_accept();
    }
};  


    int main(int argc, char* argv[])
    {
        auto const address = boost::asio::ip::make_address("XXX.XX.XX.X");
        auto const port = static_cast<unsigned short>(std::atoi("XXXX"));
        auto const threads = 1;

        // The io_context is required for all I/O
        boost::asio::io_context ioc{threads};

        // Create and launch a listening port
        std::make_shared<listener>(ioc, tcp::endpoint{address, port})->run();

        // Run the I/O service on the requested number of threads
        std::vector<std::thread> v;
        v.reserve(threads - 1);
        for(auto i = threads - 1; i > 0; --i)
            v.emplace_back(
                    [&ioc]
                    {
                        ioc.run();
                    });
        ioc.run();        
        return EXIT_SUCCESS;
    }

Я пытаюсь поэкспериментировать с websocket, в настоящее время, когда я получаю accept, я хочу отправить все данные, которые я прочитал из файла, построчно.

Программа вылетает после отправки первой строки. Посоветуйте, пожалуйста, что делаю не так.


person VikramChopde    schedule 08.09.2018    source источник
comment
Попытка увеличить количество потоков, из которых вызывается ioservice. Тем не менее проблема не устранена.   -  person VikramChopde    schedule 08.09.2018


Ответы (3)


У меня была такая же проблема, поэтому, изучив неудавшееся утверждение, я нашел этот комментарий:

    // If this assert goes off it means you are attempting to
    // simultaneously initiate more than one of same asynchronous
    // operation, which is not allowed. For example, you must wait
    // for an async_read to complete before performing another
    // async_read.
    //
    BOOST_ASSERT(id_ != T::id);
person Maxim Chetrusca    schedule 22.04.2019

    std::cout << "do_write" << std::endl;
    // Echo the message
    ws_.async_write(boost::asio::buffer(tweet),
                    boost::asio::bind_executor(strand_, std::bind(&session::on_write, shared_from_this(),
                                                                  std::placeholders::_1, std::placeholders::_2)));

При этом выполняется асинхронная операция с использованием локального буфера (аргумент tweet выходит за пределы области действия до того, как операция начнется / завершится).

Со стороны read вы решили эту проблему, используя data_, который является переменной-членом. Вам следует подумать о таком решении.

person sehe    schedule 08.09.2018
comment
Спасибо .. позвольте мне проверить это. Но почему локальная переменная, выходящая за пределы области видимости, должна давать утверждение, что идентификатор потока не равен Id. После прочтения некоторых документов он говорит, что обычно происходит при попытке выполнить две одновременные асинхронные операции. Разве здесь не так ..? - person VikramChopde; 08.09.2018
comment
Когда вы используете переменные по истечении их срока службы, это вызывает неопределенное поведение. Буквально все может случиться, и часто случаются неожиданные эффекты. Вы не можете рассуждать о программе, которая вызывает UB - person sehe; 08.09.2018
comment
Отредактировал фрагмент кода. Пытался изменить реализацию, чтобы использовать buffer_copy для внутреннего буфера. Все еще получаю то же утверждение. У меня такое чувство, что что-то идет не так. - person VikramChopde; 09.09.2018
comment
Не будет ли буферная копия также владеть содержимым? я имею в виду, что буферная копия не делает глубокую копию? Добрый совет . - person VikramChopde; 09.09.2018
comment
Одна вещь, которую я заметил, это то, что on_write не вызывается даже после записи данных, а другой do write вызывается перед ним. в каком кабине будет проблема. действительно запутался. - person VikramChopde; 09.09.2018

Это проблема параллелизма. Ответ упоминается здесь

Когда вы вызываете websocket :: stream :: async_write, вы должны дождаться завершения операции (вызывается ваш обработчик завершения) перед повторным вызовом async_write. Это объясняется здесь: https://www.boost.org/doc/libs/1_67_0/libs/beast/doc/html/beast/using_websocket/notes.html#beast.using_websocket.notes.thread_safety

Если вы хотите отправлять более одного сообщения за раз, вам необходимо реализовать собственную очередь записи. Пример можно найти здесь: person M.Hefny    schedule 15.04.2021