Дилер ZeroMQ - большая задержка при обращении к дилеру по сравнению с winsock

Моя компания изучает возможность использования ZeroMQ в качестве транспортного механизма. Сначала я проверил производительность, чтобы понять, с чем я играю.

Итак, я создал приложение, сравнивающее настройку zmq «дилер-дилер» с winsock. Я заверил, что отправка синхронных сообщений от клиента на сервер и затем вычисление среднего значения обходятся мне в одно и то же время.

Вот сервер, на котором запущен winsock:

DWORD RunServerWINSOCKTest(DWORD dwPort)
{
    WSADATA wsaData;
    int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (iRet != NO_ERROR)
    {
        printf("WSAStartup failed with error: %d\n", iRet);
        return iRet;
    }

    struct addrinfo hints;
    ZeroMemory(&hints, sizeof(hints));
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
    hints.ai_flags = AI_PASSIVE;

    struct addrinfo *result = NULL;
    iRet = getaddrinfo(NULL, std::to_string(dwPort).c_str(), &hints, &result);
    if (iRet != 0)
    {
        WSACleanup();
        return iRet;
    }

    SOCKET ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
    if (ListenSocket == INVALID_SOCKET)
    {
        freeaddrinfo(result);
        WSACleanup();
        return WSAGetLastError();
    }

    iRet = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen);
    if (iRet == SOCKET_ERROR)
    {
        freeaddrinfo(result);
        closesocket(ListenSocket);
        WSACleanup();
        return WSAGetLastError();
    }

    freeaddrinfo(result);
    iRet = listen(ListenSocket, SOMAXCONN);
    if (iRet == SOCKET_ERROR)
    {
        closesocket(ListenSocket);
        WSACleanup();
        return WSAGetLastError();
    }

    while (true)
    {
        SOCKET ClientSocket = accept(ListenSocket, NULL, NULL);
        if (ClientSocket == INVALID_SOCKET)
        {
            closesocket(ListenSocket);
            WSACleanup();
            return WSAGetLastError();
        }
        char value = 0;
        setsockopt(ClientSocket, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value));

        char recvbuf[DEFAULT_BUFLEN];
        int recvbuflen = DEFAULT_BUFLEN;
        do {

            iRet = recv(ClientSocket, recvbuf, recvbuflen, 0);
            if (iRet > 0) {
            // Echo the buffer back to the sender
                int iSendResult = send(ClientSocket, recvbuf, iRet, 0);
                if (iSendResult == SOCKET_ERROR)
                {
                    closesocket(ClientSocket);
                    WSACleanup();
                    return WSAGetLastError();
                }
            }
            else if (iRet == 0)
                printf("Connection closing...\n");
            else  {
                closesocket(ClientSocket);
                WSACleanup();
                return 1;
            }

        } while (iRet > 0);

        iRet = shutdown(ClientSocket, SD_SEND);
        if (iRet == SOCKET_ERROR)
        {
            closesocket(ClientSocket);
            WSACleanup();
            return WSAGetLastError();
        }
        closesocket(ClientSocket);
    }
    closesocket(ListenSocket);

    return WSACleanup();
}

Вот клиент, на котором запущен winsock:

DWORD RunClientWINSOCKTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
    WSADATA wsaData;
    int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (iRet != NO_ERROR)
    {
        return iRet;
    }

    SOCKET ConnectSocket = INVALID_SOCKET;
    struct addrinfo *result = NULL,  *ptr = NULL, hints;


    ZeroMemory(&hints, sizeof(hints));
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;

    int iResult = getaddrinfo(strAddress.c_str(), std::to_string(dwPort).c_str(), &hints, &result);
    if (iResult != 0) {
        WSACleanup();
        return 1;
    }

    for (ptr = result; ptr != NULL; ptr = ptr->ai_next) {
        ConnectSocket = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
        if (ConnectSocket == INVALID_SOCKET) {
            WSACleanup();
            return 1;
        }

        iResult = connect(ConnectSocket, ptr->ai_addr, (int)ptr->ai_addrlen);
        if (iResult == SOCKET_ERROR) {
            closesocket(ConnectSocket);
            ConnectSocket = INVALID_SOCKET;
            continue;
        }
        break;
    }

    freeaddrinfo(result);

    if (ConnectSocket == INVALID_SOCKET) {
        WSACleanup();
        return 1;
    }


    // Statistics
    UINT64 uint64BytesTransmitted = 0;
    UINT64 uint64StartTime = s_TimeStampGenerator.GetHighResolutionTimeStamp();
    UINT64 uint64WaitForResponse = 0;

    DWORD dwMessageCount = 1000000;

    CHAR cRecvMsg[DEFAULT_BUFLEN];
    SecureZeroMemory(&cRecvMsg, DEFAULT_BUFLEN);

    std::string strSendMsg(dwMessageSize, 'X');

    for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
    {
        int iRet = send(ConnectSocket, strSendMsg.data(), strSendMsg.size(), 0);
        if (iRet == SOCKET_ERROR) {
            closesocket(ConnectSocket);
            WSACleanup();
            return 1;
        }
        uint64BytesTransmitted += strSendMsg.size();

        UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
        iRet = recv(ConnectSocket, cRecvMsg, DEFAULT_BUFLEN, 0);
        if (iRet < 1)
        {
            closesocket(ConnectSocket);
            WSACleanup();
            return 1;
        }
        std::string strMessage(cRecvMsg);

        if (strMessage.compare(strSendMsg) == 0)
        {
            uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
        }
        else
        {
            return NO_ERROR;
        }
}

    UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
    PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);

    iResult = shutdown(ConnectSocket, SD_SEND);
    if (iResult == SOCKET_ERROR) {
        closesocket(ConnectSocket);
        WSACleanup();
        return 1;
    }
    closesocket(ConnectSocket);
    return WSACleanup();
}

Вот сервер под управлением ZMQ (дилер)

DWORD RunServerZMQTest(DWORD dwPort)
{
    try
    {
        zmq::context_t context(1);
        zmq::socket_t server(context, ZMQ_DEALER);

        // Set options here
        std::string strIdentity = s_set_id(server);
        printf("Created server connection with ID: %s\n", strIdentity.c_str());

        std::string strConnect = "tcp://*:" + std::to_string(dwPort);
        server.bind(strConnect.c_str());

        bool bRunning = true;
        while (bRunning)
        {
            std::string strMessage = s_recv(server);

            if (!s_send(server, strMessage))
            {
                return NO_ERROR;
            }
        }
    }
    catch (zmq::error_t& e)
    {
        return (DWORD)e.num();
    }

return NO_ERROR;

}

Вот клиент под управлением ZMQ (дилер)

DWORD RunClientZMQTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
    try
    {
        zmq::context_t ctx(1);
        zmq::socket_t client(ctx, ZMQ_DEALER); // ZMQ_REQ

        // Set options here
        std::string strIdentity = s_set_id(client);

        std::string strConnect = "tcp://" + strAddress + ":" + std::to_string(dwPort);
        client.connect(strConnect.c_str());

        if(s_send(client, "INIT"))
        {
            std::string strMessage = s_recv(client);
            if (strMessage.compare("INIT") == 0)
            {
                printf("Client[%s] connected to: %s\n", strIdentity.c_str(), strConnect.c_str());
            }
            else
            {
                return NO_ERROR;
            }
        }
        else
        {
            return NO_ERROR;
        }


        // Statistics
        UINT64 uint64BytesTransmitted   = 0;
        UINT64 uint64StartTime          = s_TimeStampGenerator.GetHighResolutionTimeStamp();
        UINT64 uint64WaitForResponse    = 0;

        DWORD dwMessageCount = 10000000;


        std::string strSendMsg(dwMessageSize, 'X');
        for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
        {
            if (s_send(client, strSendMsg))
            {
                uint64BytesTransmitted += strSendMsg.size();

                UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
                std::string strRecvMsg = s_recv(client);
                if (strRecvMsg.compare(strSendMsg) == 0)
                {
                    uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
                }
                else
                {
                    return NO_ERROR;
                }
            }
            else
            {
                return NO_ERROR;
            }
        }
        UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
        PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);
    }
    catch (zmq::error_t& e)
    {
        return (DWORD)e.num();
    }

    return NO_ERROR;
    }

Я запускаю тест локально с размером сообщения 5 байт и получаю следующий результат:

WINSOCK

Messages sent:                 1 000 000
Time elapsed (us):            48 019 415
Time elapsed (s):                     48.019 415
Message size (bytes):                  5
Msg/s:                            20 825
Bytes/s:                         104 125
Mb/s:                                  0.099
Total   response time (us):   24 537 376
Average repsonse time (us):           24.0

и

ZeroMQ

Messages sent:                 1 000 000
Time elapsed (us):           158 290 708
Time elapsed (s):                    158.290 708    
Message size (bytes):                  5
Msg/s:                             6 317
Bytes/s:                          31 587
Mb/s:                                  0.030
Total   response time (us):  125 524 178    
Average response time (us):          125.0

Может ли кто-нибудь объяснить, почему при использовании ZMQ среднее время отклика намного выше?

Цель состоит в том, чтобы найти установку, в которой я могу отправлять и получать сообщения асинхронно без необходимости отвечать. Если это может быть достигнуто с помощью другой настройки, чем дилер-дилер, пожалуйста, дайте мне знать!


person rhedin    schedule 07.10.2014    source источник
comment
Я быстро просмотрел веб-сайт 0MQ и нашел в основном коммерческую поддержку и отслеживание ошибок, для которых этот вопрос не подходит (тем более, что вы все еще исследуете). Это сложный вопрос, потому что добавление некоторого стека поверх сокетов, как ожидается, приведет к увеличению задержки, но это довольно много. Основное различие в тесте - это использование std :: string, которое может иметь значение (но не настолько). Кроме того, убедитесь, что в режиме выпуска включены флаги оптимизации компилятора, чтобы в тесте не участвовали дополнительные проверки.   -  person stefaanv    schedule 07.10.2014
comment
Я использую одни и те же флаги оптимизации компилятора для обеих реализаций, поэтому это должно быть проблемой. Строка std :: string создается только один раз, затем данные ссылаются на несколько раз, так что это не может быть так. Спасибо за вклад   -  person rhedin    schedule 07.10.2014
comment
Существуют тесты, которые сравнивают динамические массивы с векторами и показывают, что векторы намного медленнее с теми же флагами оптимизации, но оказалось, что с этими флагами были выполнены дополнительные проверки для векторов, которые не использовались в режиме выпуска с включенной оптимизацией. Строка каждый раз получает сервер и клиент и отправляет обратно сервером. Даже если строка находится в стеке, ее фактические данные могут быть динамическими.   -  person stefaanv    schedule 07.10.2014
comment
Имейте в виду, что я не ожидаю, что std :: string будет иметь большое значение по сравнению с обработкой стека и сетевым трафиком. Флаги могли. И, кстати, вы уже смотрели трафик сетевым монитором вроде wirehark?   -  person stefaanv    schedule 07.10.2014
comment
Хорошо, поэтому сначала я меняю флаг оптимизации на Полная оптимизация, что дало мне улучшение как для winsock (Среднее время отклика (us): 17,0), так и zmq (Среднее время отклика (us): 111,0). Я также использую массивы символов в одних и тех же местах для обеих реализаций, но это не имело значения. Я думаю, что увеличенная задержка связана с тем, что сообщения ставятся в очередь перед отправкой. Есть мысли по этому поводу?   -  person rhedin    schedule 08.10.2014
comment
Вы также упомянули об использовании wirehark, это что-то показало? Возможна постановка сообщений в очередь. Чтобы это проверить, вам нужен кто-то с основательными знаниями ZeroMQ. Суть в том, что, конечно, для простых случаев добавление стеков и библиотек обычно увеличивает задержки, даже если то, что вы тестировали, кажется больше, чем ожидалось. Основные вопросы: 1. Допустима ли дополнительная задержка? 2. Стоит ли дополнительное преимущество облегчения обработки трафика дополнительной задержки? 3. Разница в задержке лучше в реальных жизненных ситуациях?   -  person stefaanv    schedule 08.10.2014


Ответы (3)


Это лишь своего рода ответ на небольшую часть вашего вопроса, но здесь ...

Зачем вам дилер / дилер? Я предполагаю, потому что связь может начаться с любой точки? Вы не привязаны к дилеру / дилеру, в частности, это ограничивает вас только двумя конечными точками. Если вы когда-нибудь добавите еще одну конечную точку с любой стороны связи, скажем, второго клиента, то каждый клиент будет получать только половину сообщений, потому что дилер работает строго по круговому алгоритму.

Что вам нужно для асинхронной связи, так это комбинация розеток дилеров и / или маршрутизаторов. Ни то, ни другое не требует ответа, основные различия заключаются в том, как они выбирают, какому подключенному узлу отправить сообщение:

  • Дилер, как сказано, строго по круговой системе, он пошлет каждому подключенному одноранговому узлу последовательно
  • Маршрутизатор - это строго адресное сообщение, вы должны знать «имя» однорангового узла, которому вы хотите отправить сообщение, чтобы получить это сообщение.

Эти два типа сокетов работают вместе, потому что сокеты дилеров (и сокеты запросов, дилер - это сокет типа запроса) отправляют свое «имя» как часть сообщения, которое сокет маршрутизатора может использовать для обратной отправки данных. Это парадигма запроса / ответа, и вы увидите, что такая парадигма применяется во всех примерах в руководство, но вы можете приспособить эту парадигму к тому, что ищете, в частности, ни дилер, ни маршрутизатор не требуют ответа.

Не зная ваших полных требований, я не могу сказать вам, какую архитектуру ZMQ я бы выбрал, но в целом я предпочитаю расширяемость сокетов маршрутизатора, легче справиться с соответствующей адресацией, чем втиснуть все в один узел ... вы увидите предупреждения против использования маршрутизатора / маршрутизатора, и я согласен с ними в той степени, в которой вы должны понимать, что вы делаете, прежде чем попробовать, но понимание того, что вы делаете, реализация не так уж и сложна.


У вас также есть возможность, если это соответствует вашим требованиям, настроить каждый конец с сокетом pub и каждый с вспомогательным сокетом, если буквально нет ответов когда-либо. Если это строго поток данных от источника к цели, и ни один из партнеров не нуждается в обратной связи о том, что он отправляет, то это, вероятно, лучший выбор, даже если это означает, что вы имеете дело с двумя сокетами на каждом конце, а не с одним.


Ничто из этого не затрагивает производительность напрямую, но важно понимать, что сокеты zmq оптимизированы для конкретных случаев использования, и, как указано в ответе Джона Джеффери, вы нарушаете этот вариант использования для своего дилерского сокета, создавая обмен сообщениями в ваш тест строго синхронный. Первое, с чего нужно начать, - это доработать архитектуру ZMQ, а затем смоделировать реальный поток сообщений, в частности, без добавления произвольных ожиданий и синхронности, что обязательно изменит способ пропускная способность выглядит так, как вы ее тестируете, в значительной степени по определению.

person Jason    schedule 08.10.2014
comment
Спасибо за исчерпывающие ответы! Дизайн, над которым я работаю, должен поддерживать асинхронный обмен сообщениями в обоих направлениях. У меня есть рабочее решение, но производительность примерно на 50% хуже, чем у моего решения winsock (IOCP). Это побудило меня углубиться в реализацию, чтобы найти потенциальные узкие места, которые привели меня к этому модульному тесту. Причина, по которой я задал этот вопрос, заключалась в том, чтобы получить некоторые рекомендации о том, что делать, а что не делать. Я прислушусь к вашему совету и изучу дизайн маршрутизатора-маршрутизатора, надеюсь, он даст лучшую производительность! Еще раз спасибо - person rhedin; 09.10.2014
comment
Я бы не стал предполагать, что маршрутизатор / маршрутизатор обеспечит лучшую производительность, в частности, если ваше общение гарантированно будет индивидуальным, тогда дилер / дилер будет намного проще, и если это будет гарантировано быть «многие к одному», тогда дилер / маршрутизатор также будет намного проще. Если это может быть много-ко-многим, тогда маршрутизатор / маршрутизатор может иметь смысл или конструкция с несколькими сокетами может иметь больше смысла, в зависимости от особенностей вашей ситуации. В любом случае важно помнить о дилере и маршрутизаторе, что они оптимизированы для асинхронной связи, старайтесь не нарушать это. - person Jason; 09.10.2014
comment
Я понимаю вашу точку зрения, но еще кое-что. Я также пробовал то же приложение, но с REQ-REP (который больше подходит для синхронной передачи, не так ли?), Но оно тоже не дало никаких улучшений. Что вы думаете об этом или какой тип сокета лучше использовать для синхронной передачи? Еще раз спасибо за вклад - person rhedin; 10.10.2014
comment
Я совсем не удивлен, что REQ-REP дает такую ​​же производительность в этом тесте. Типы сокетов REQ / REP основаны на DEALER / ROUTER, но с той существенной разницей, что они обеспечивают синхронный обмен сообщениями. Поскольку ваш тестовый код уже обеспечивает синхронный обмен сообщениями, следовательно, производительность будет такой же. Как правило, приложения хотят выполнять асинхронный обмен сообщениями для той или иной функции; чистый REQ / REP для обучения / новичков :-). - person John Jefferies; 10.10.2014
comment
Да, как сказал @JohnJefferies, проблемы с производительностью связаны с шаблоном обмена сообщениями, а не с типами сокетов. Вы увидите, каковы истинные пределы производительности ZMQ, только используя асинхронный обмен сообщениями. Причина, по которой я говорю о типах сокетов, заключается в том, что определенные типы сокетов не будут препятствовать этому. Синхронный обмен сообщениями по определению больше касается структуры диалога (запрос-ответ-запрос-ответ), чем его скорости. С менее самоуверенным протоколом (например, winsock) вы сохраните более высокий уровень скорости, но он все равно не приблизится к тому, что возможно с помощью async. - person Jason; 10.10.2014

Вы говорите, что хотите отправлять и получать сообщения асинхронно без необходимости отвечать. Тем не менее, все проведенные до сих пор тесты являются полностью синхронными, по сути, запрос-ответ, но на розетке дилера-дилера. Что-то там не вычисляется. Почему бы не провести тесты, которые более точно имитируют дизайн, к которому вы стремитесь?

ZeroMQ получает значительную часть своей производительности «быстрее, чем TCP», объединяя сообщения из очереди в одно сообщение. Очевидно, что этот механизм не может быть задействован в чисто синхронной схеме, когда одновременно передается только одно сообщение.

Что касается того, почему этот конкретный тест, когда очень маленькие сообщения отправляются и принимаются чисто синхронно, он относительно медленный, я не могу сказать. Вы сделали профилирование? Я снова скажу, что запуск этого теста и принятие на нем решений не имеет смысла, если он не похож на ваш окончательный проект.

Странно выглядит блок try / catch в коде ZeroMQ. Это выглядит нечестно, потому что тест winsock был написан иначе. Известно, что в try / catch есть / были изрядные накладные расходы.

person John Jefferies    schedule 07.10.2014
comment
Причина, по которой я выполняю этот конкретный тест, заключается в том, что мне нужно использовать настройку дилер-дилер для асинхронного обмена сообщениями (насколько я знаю), но я хочу выяснить накладные расходы. У нас есть рабочее решение, в котором мы отправляем сообщения асинхронно, но у нас также низкая производительность, поэтому я провожу низкоуровневые тесты. Я попытаюсь удалить try / catch, в настоящее время я работаю над профилированием и смотрю трафик с помощью wirehark. - person rhedin; 08.10.2014
comment
Убрал попытку / уловку, без разницы - person rhedin; 08.10.2014

Проблема OP связана с пропускной способностью, а не с задержкой, и, вероятно, связана с шаблоном, который используется в приведенных примерах. Однако вы, вероятно, всегда обнаружите, что ZeroMQ имеет более высокую задержку, что я объясню, хотя это может быть бесполезно для OP в этой ситуации.

ZeroMQ работает путем буферизации сообщений. Представьте (в качестве базовой иллюстрации) создание std::string и добавление к нему множества небольших строк (многие тысячи, каждая из которых включает небольшой заголовок, чтобы узнать размер этих небольших сегментов), а затем отправку этой более крупной строки в интервалы 100us, 1000us, 10ms или любые другие. На принимающей стороне принимается большая строка, и каждое меньшее сообщение удаляется по одному в зависимости от размера заголовка, который отправляется вместе с ним. Это позволяет вам потенциально отправлять миллионы сообщений пакетами (хотя std::string, очевидно, плохой выбор) без накладных расходов на отправку этих миллионов очень маленьких измерений по одному. В результате вы в полной мере используете свои сетевые ресурсы и увеличиваете пропускную способность, а также создаете базовое FIFO поведение. Однако вы также создаете задержку, позволяющую заполнить буфер, что означает увеличение задержки.

Представьте (опять же, просто в качестве базовой иллюстрации): если вы потратите полсекунды (включая строковые операции и т. Д.) На буферизацию миллиона сообщений, это приведет к увеличению строки в несколько мегабайт. Современные сети могут легко отправить эту большую строку за оставшиеся полсекунды. 1000000us (1 секунда) / 1000000 сообщений будет 1us на сообщение, верно? Неправильно - у всех сообщений была задержка в полсекунды, чтобы позволить очереди заполниться, что привело к увеличению задержки до полсекунды для всех сообщений. ZeroMQ отправляет пакеты намного быстрее, чем каждый 500ms, но увеличение задержки, которое это иллюстрирует, по-прежнему происходит в ZeroMQ, хотя обычно оно составляет несколько ms.

person JSON    schedule 17.11.2015