Шаблон push/pull ZeroMQ

Я решил написать тестовый код, чтобы посмотреть, как работает связка pusher - many pullers, и мои подозрения оправдались.

Пуллеры получают сообщения в порядке их подключения, например 1-е сообщение принимает 1-й подключенный пуллер, 2-й - 2-й и т.д. Я смоделировал ситуацию, когда один из пуллеров оставался занятым после получения сообщения, но когда пришло время получить сообщение, оно все равно стояло в очереди, поэтому я «потерял» сообщение. Плохо. Я хочу, чтобы это сообщение было получено следующим «бесплатным» съемником. Это реально?

Мой тестовый код. Я использую zmqpp в качестве привязки

void main()
{
    auto _socket = sIpcContext->CreateNewSocket(zmqpp::socket_type::push);
    _socket->bind("tcp://*:4242");


    for (auto i = 0; i < 3; ++i)
    {
        new std::thread([&](int _idx)
        {
            auto idx = _idx;
            auto sock = sIpcContext->CreateNewSocket(zmqpp::socket_type::pull);
            sock->connect("tcp://127.0.0.1:4242");

            for (;;)
            {
                std::string msg;
                sock->receive(msg);
                std::cout << idx << " received: " << msg << std::endl;
                if (idx == 1)
                {
                    std::cout << "Puller 1 is now busy" << std::endl;
                    std::this_thread::sleep_for(std::chrono::seconds(10000));
                }
            }
        }, i);
    }

    for (auto i = 0;; ++i)
    {
        _socket->send(std::to_string(i));
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

Я получаю этот вывод:

0 received: 0
0 received: 1
1 received: 2
Puller 1 is now busy
2 received: 3
0 received: 4
2 received: 6
0 received: 7
2 received: 9
0 received: 10
2 received: 12
0 received: 13
2 received: 15

Как видите, 5, 8 и т. д. «пропущены», но на самом деле поставлены в очередь в съемнике №1.


person Yaki Khadafi    schedule 07.05.2015    source источник


Ответы (2)


Да, push/pull-сокеты достаточно тупые, чтобы допустить это. Вы можете использовать другие сокеты, такие как маршрутизатор/дилер, для отправки работы бесплатному работнику.

person Rennex    schedule 10.05.2015

Руководство 0MQ объясняет этот случай (называет его аналогией с почтовым отделением):

Аналогия с почтой. Если у вас есть одна очередь на прилавок, и некоторые люди покупают марки (быстрая и простая транзакция), а некоторые люди открывают новые счета (очень медленная транзакция), то вы обнаружите, что покупатели марок несправедливо застревают в очередях. Как и в почтовом отделении, если ваша архитектура обмена сообщениями несправедлива, люди будут раздражаться.

Решение в почтовом отделении состоит в том, чтобы создать единую очередь, чтобы даже если один или два счетчика застряли с медленной работой, другие счетчики продолжали обслуживать клиентов в порядке очереди.

Короче говоря, вы должны использовать МАРШРУТИЗАТОР при работе с медленными рабочими процессами.

person marsop    schedule 02.05.2016