Я пробую несколько крайних случаев в своем программном обеспечении. Итак, я создал очень простую тестовую среду:
- Сервер RabbitMQ под управлением CentOS 7
- Потребитель сообщений, написанный для .NETCore 2.1 на C #, работающий под CentOS 7
- Отправитель сообщения, написанный для .NETCore 2.1 на C #, работающий под CentOS 7
Я отправляю простое текстовое сообщение каждые 5 секунд. Отправитель и получатель работают на одном компьютере UNIX, в то время как сервер RabbitMQ работает на другом компьютере в сети. Все идет нормально. Теперь я останавливаю свой сервер RabbitMQ с помощью systemctl stop rabbitmq-server
.
Я получаю ошибки отправителя и получателя, чего и ожидалось.
Я перезапускаю сервер RabbitMQ, используя systemctl start rabbitmq-server
.
А теперь самое интересное! Отправитель может восстанавливать и продолжает отправлять сообщения, но потребитель НЕ МОЖЕТ восстановить и не получает сообщения. Они накапливаются на сервере RabbitMQ!
Вот мои записи журнала от отправителя (которые работают должным образом):
2019-01-22 21:18:25.628 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 10] Failed to connect to broker infraserver-tbws2, port 5672, vhost testvh
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable ---> System.AggregateException: One or more errors occurred. (Connection failed) ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed ---> System.Net.Internals.SocketExceptionFactory+ExtendedSocketException: Connection refused 172.16.63.239:5672
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw(Exception source)
at System.Net.Sockets.Socket.EndConnect(IAsyncResult asyncResult)
at System.Net.Sockets.Socket.<>c.<ConnectAsync>b__272_0(IAsyncResult iar)
--- End of stack trace from previous location where exception was thrown ---
at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, Int32 millisecondsTimeout)
at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, Int32 timeout)
--- End of inner exception stack trace ---
at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout, AddressFamily family)
at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout)
at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
--- End of inner exception stack trace ---
at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
--- End of inner exception stack trace ---
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
at EasyNetQ.ConnectionFactoryWrapper.CreateConnection()
at EasyNetQ.PersistentConnection.TryToConnect()
2019-01-22 21:18:25.632 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 10] Failed to connect to any Broker. Retrying in 00:00:05
2019-01-22 21:18:35.444 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 7.' | Num: 7 | Guid: 5345c7e4-61e6-4c79-8179-d4bef7864420'.
2019-01-22 21:18:40.452 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 8.' | Num: 8 | Guid: 3cd8635c-cdfa-45f3-8495-2acb0713d47b'.
2019-01-22 21:18:45.457 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 9.' | Num: 9 | Guid: 099462b8-cd66-40b9-ac10-89c3246819ec'.
2019-01-22 21:18:50.470 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 10.' | Num: 10 | Guid: c25139b2-8e45-4771-9544-830014382e0c'.
2019-01-22 21:18:55.515 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 11.' | Num: 11 | Guid: 90049d91-3805-4aaa-ac18-b61c09164afd'.
2019-01-22 21:19:00.526 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 12.' | Num: 12 | Guid: 108ff318-6a34-4e64-94bd-dafa67aa6717'.
Это показывает последнее сообщение об ошибке, а затем можно увидеть, что EasyNetQ восстановился и может снова доставлять сообщения.
Потребитель сообщения НЕ работает! Вот мои записи в журнале:
2019-01-22 21:18:25.623 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 12] Failed to connect to broker infraserver-tbws2, port 5672, vhost testvh
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable ---> System.AggregateException: One or more errors occurred. (Connection failed) ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed ---> System.Net.Internals.SocketExceptionFactory+ExtendedSocketException: Connection refused 172.16.63.239:5672
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw(Exception source)
at System.Net.Sockets.Socket.EndConnect(IAsyncResult asyncResult)
at System.Net.Sockets.Socket.<>c.<ConnectAsync>b__272_0(IAsyncResult iar)
--- End of stack trace from previous location where exception was thrown ---
at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, Int32 millisecondsTimeout)
at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, Int32 timeout)
--- End of inner exception stack trace ---
at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout, AddressFamily family)
at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout)
at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
--- End of inner exception stack trace ---
at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
--- End of inner exception stack trace ---
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
at EasyNetQ.ConnectionFactoryWrapper.CreateConnection()
at EasyNetQ.PersistentConnection.TryToConnect()
2019-01-22 21:18:25.625 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 12] Failed to connect to any Broker. Retrying in 00:00:05
Вот он сидит и ждет вечно! Похоже, что объект заблокирован, пока сообщения накапливаются на сервере RabbitMQ:
Когда я останавливаю свое потребительское приложение и перезапускаю его, сообщения принимаются.
Я подключаюсь к обоим приложениям (отправителю и потребителю) с помощью следующего кода:
private static IBus SetupRabbitMqConnection(string rabbitServer, string rabbitVHost, ushort rabbitPort, string rabbitUser, string rabbitPwd)
{
Log.Logger.Debug($"Creating a connection to RabbitMQ server '{rabbitServer}' on port {rabbitPort.ToString()} " +
$"using the EasyNetQ library....");
try
{
var connStr = $"host={rabbitServer}:{rabbitPort.ToString()};virtualHost={rabbitVHost};username={rabbitUser};" +
$"password={rabbitPwd};publisherConfirms=true;timeout=30;prefetchcount=1;requestedHeartbeat=30";
var msgBus = RabbitHutch.CreateBus(connStr, x => { });
if (!msgBus.IsConnected)
{
var errMsg = $"Currently not connected to RabbitMQ server '{rabbitServer}'.";
Log.Logger.Error(errMsg);
}
Log.Logger.Debug("Successfully connected to RabbitMQ server.");
return msgBus;
}
catch (Exception ex)
{
Log.Logger.Error($"Error to establish a connection to RabbitMQ server '{rabbitServer}'. Error: {ex}");
throw;
}
}
Программа-потребитель регистрирует слушателей следующим образом:
var msgBus = SetupRabbitMqConnection(rabbitServer, vhost, rabbitPort, rabbitUser, rabbitPwd);
RegisterMsgSubscriptions(msgBus);
private static void RegisterMsgSubscriptions(IBus msgBus)
{
Log.Logger.Debug("Starting to register RabbitMQ message subscriptions...");
try
{
#region Queue declarations
var advancedBus = msgBus.Advanced;
var testQueueOne = new EasyNetQ.Topology.Queue(TestQueueOneName, true);
Log.Logger.Debug("Finished declaring queues.");
#endregion
#region Message Queue Handler registrations
advancedBus.Consume(testQueueOne, registration => registration
.Add<RabbitMessage<TestTextMessageDto>>(MessageProcessor.ProcessRabbitTestMessage));
Log.Logger.Debug("Finished registration of message handlers for several queues.");
#endregion
}
catch (Exception ex)
{
Log.Logger.Error($"Error registering message handler. Error: {ex}");
}
}
Есть идеи, что здесь может пойти не так ?? В производстве у нас более 100 серверов, принимающих сообщения. Эти серверы расположены по всей стране. Сервер RabbitMQ находится в дата-центре. Таким образом, потребляющие серверы ДОЛЖНЫ восстанавливаться, если соединение потеряно, в противном случае его нельзя будет использовать!