Моя компания изучает возможность использования ZeroMQ в качестве транспортного механизма. Сначала я проверил производительность, чтобы понять, с чем я играю.
Итак, я создал приложение, сравнивающее настройку zmq «дилер-дилер» с winsock. Я заверил, что отправка синхронных сообщений от клиента на сервер и затем вычисление среднего значения обходятся мне в одно и то же время.
Вот сервер, на котором запущен winsock:
DWORD RunServerWINSOCKTest(DWORD dwPort)
{
WSADATA wsaData;
int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iRet != NO_ERROR)
{
printf("WSAStartup failed with error: %d\n", iRet);
return iRet;
}
struct addrinfo hints;
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
struct addrinfo *result = NULL;
iRet = getaddrinfo(NULL, std::to_string(dwPort).c_str(), &hints, &result);
if (iRet != 0)
{
WSACleanup();
return iRet;
}
SOCKET ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
if (ListenSocket == INVALID_SOCKET)
{
freeaddrinfo(result);
WSACleanup();
return WSAGetLastError();
}
iRet = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen);
if (iRet == SOCKET_ERROR)
{
freeaddrinfo(result);
closesocket(ListenSocket);
WSACleanup();
return WSAGetLastError();
}
freeaddrinfo(result);
iRet = listen(ListenSocket, SOMAXCONN);
if (iRet == SOCKET_ERROR)
{
closesocket(ListenSocket);
WSACleanup();
return WSAGetLastError();
}
while (true)
{
SOCKET ClientSocket = accept(ListenSocket, NULL, NULL);
if (ClientSocket == INVALID_SOCKET)
{
closesocket(ListenSocket);
WSACleanup();
return WSAGetLastError();
}
char value = 0;
setsockopt(ClientSocket, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value));
char recvbuf[DEFAULT_BUFLEN];
int recvbuflen = DEFAULT_BUFLEN;
do {
iRet = recv(ClientSocket, recvbuf, recvbuflen, 0);
if (iRet > 0) {
// Echo the buffer back to the sender
int iSendResult = send(ClientSocket, recvbuf, iRet, 0);
if (iSendResult == SOCKET_ERROR)
{
closesocket(ClientSocket);
WSACleanup();
return WSAGetLastError();
}
}
else if (iRet == 0)
printf("Connection closing...\n");
else {
closesocket(ClientSocket);
WSACleanup();
return 1;
}
} while (iRet > 0);
iRet = shutdown(ClientSocket, SD_SEND);
if (iRet == SOCKET_ERROR)
{
closesocket(ClientSocket);
WSACleanup();
return WSAGetLastError();
}
closesocket(ClientSocket);
}
closesocket(ListenSocket);
return WSACleanup();
}
Вот клиент, на котором запущен winsock:
DWORD RunClientWINSOCKTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
WSADATA wsaData;
int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iRet != NO_ERROR)
{
return iRet;
}
SOCKET ConnectSocket = INVALID_SOCKET;
struct addrinfo *result = NULL, *ptr = NULL, hints;
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
int iResult = getaddrinfo(strAddress.c_str(), std::to_string(dwPort).c_str(), &hints, &result);
if (iResult != 0) {
WSACleanup();
return 1;
}
for (ptr = result; ptr != NULL; ptr = ptr->ai_next) {
ConnectSocket = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (ConnectSocket == INVALID_SOCKET) {
WSACleanup();
return 1;
}
iResult = connect(ConnectSocket, ptr->ai_addr, (int)ptr->ai_addrlen);
if (iResult == SOCKET_ERROR) {
closesocket(ConnectSocket);
ConnectSocket = INVALID_SOCKET;
continue;
}
break;
}
freeaddrinfo(result);
if (ConnectSocket == INVALID_SOCKET) {
WSACleanup();
return 1;
}
// Statistics
UINT64 uint64BytesTransmitted = 0;
UINT64 uint64StartTime = s_TimeStampGenerator.GetHighResolutionTimeStamp();
UINT64 uint64WaitForResponse = 0;
DWORD dwMessageCount = 1000000;
CHAR cRecvMsg[DEFAULT_BUFLEN];
SecureZeroMemory(&cRecvMsg, DEFAULT_BUFLEN);
std::string strSendMsg(dwMessageSize, 'X');
for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
{
int iRet = send(ConnectSocket, strSendMsg.data(), strSendMsg.size(), 0);
if (iRet == SOCKET_ERROR) {
closesocket(ConnectSocket);
WSACleanup();
return 1;
}
uint64BytesTransmitted += strSendMsg.size();
UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
iRet = recv(ConnectSocket, cRecvMsg, DEFAULT_BUFLEN, 0);
if (iRet < 1)
{
closesocket(ConnectSocket);
WSACleanup();
return 1;
}
std::string strMessage(cRecvMsg);
if (strMessage.compare(strSendMsg) == 0)
{
uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
}
else
{
return NO_ERROR;
}
}
UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);
iResult = shutdown(ConnectSocket, SD_SEND);
if (iResult == SOCKET_ERROR) {
closesocket(ConnectSocket);
WSACleanup();
return 1;
}
closesocket(ConnectSocket);
return WSACleanup();
}
Вот сервер под управлением ZMQ (дилер)
DWORD RunServerZMQTest(DWORD dwPort)
{
try
{
zmq::context_t context(1);
zmq::socket_t server(context, ZMQ_DEALER);
// Set options here
std::string strIdentity = s_set_id(server);
printf("Created server connection with ID: %s\n", strIdentity.c_str());
std::string strConnect = "tcp://*:" + std::to_string(dwPort);
server.bind(strConnect.c_str());
bool bRunning = true;
while (bRunning)
{
std::string strMessage = s_recv(server);
if (!s_send(server, strMessage))
{
return NO_ERROR;
}
}
}
catch (zmq::error_t& e)
{
return (DWORD)e.num();
}
return NO_ERROR;
}
Вот клиент под управлением ZMQ (дилер)
DWORD RunClientZMQTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
try
{
zmq::context_t ctx(1);
zmq::socket_t client(ctx, ZMQ_DEALER); // ZMQ_REQ
// Set options here
std::string strIdentity = s_set_id(client);
std::string strConnect = "tcp://" + strAddress + ":" + std::to_string(dwPort);
client.connect(strConnect.c_str());
if(s_send(client, "INIT"))
{
std::string strMessage = s_recv(client);
if (strMessage.compare("INIT") == 0)
{
printf("Client[%s] connected to: %s\n", strIdentity.c_str(), strConnect.c_str());
}
else
{
return NO_ERROR;
}
}
else
{
return NO_ERROR;
}
// Statistics
UINT64 uint64BytesTransmitted = 0;
UINT64 uint64StartTime = s_TimeStampGenerator.GetHighResolutionTimeStamp();
UINT64 uint64WaitForResponse = 0;
DWORD dwMessageCount = 10000000;
std::string strSendMsg(dwMessageSize, 'X');
for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
{
if (s_send(client, strSendMsg))
{
uint64BytesTransmitted += strSendMsg.size();
UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
std::string strRecvMsg = s_recv(client);
if (strRecvMsg.compare(strSendMsg) == 0)
{
uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
}
else
{
return NO_ERROR;
}
}
else
{
return NO_ERROR;
}
}
UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);
}
catch (zmq::error_t& e)
{
return (DWORD)e.num();
}
return NO_ERROR;
}
Я запускаю тест локально с размером сообщения 5 байт и получаю следующий результат:
WINSOCK
Messages sent: 1 000 000
Time elapsed (us): 48 019 415
Time elapsed (s): 48.019 415
Message size (bytes): 5
Msg/s: 20 825
Bytes/s: 104 125
Mb/s: 0.099
Total response time (us): 24 537 376
Average repsonse time (us): 24.0
и
ZeroMQ
Messages sent: 1 000 000
Time elapsed (us): 158 290 708
Time elapsed (s): 158.290 708
Message size (bytes): 5
Msg/s: 6 317
Bytes/s: 31 587
Mb/s: 0.030
Total response time (us): 125 524 178
Average response time (us): 125.0
Может ли кто-нибудь объяснить, почему при использовании ZMQ среднее время отклика намного выше?
Цель состоит в том, чтобы найти установку, в которой я могу отправлять и получать сообщения асинхронно без необходимости отвечать. Если это может быть достигнуто с помощью другой настройки, чем дилер-дилер, пожалуйста, дайте мне знать!