Разрешить RabbitMQ и Pika поддерживать соединение всегда открытым

У меня есть сценарий Python, который считывает данные из потока, и когда новая строка считывается, она помещает ее содержимое (строку) в очередь RabbitMQ.

Дело в том, что поток может не отправлять сообщения за 1, 2 или 9 часов или около того, поэтому я хотел бы, чтобы соединение RabbitMQ всегда было открыто.

Проблема в том, что когда я создаю соединение и канал:

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials))
channel = self.connection.channel()
channel.exchange_declare(exchange=self.exchange_name, exchange_type='fanout')

... и если через час приходит сообщение, я получаю такую ​​ошибку:

  File "/usr/local/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/var/opt/rabbitmq-agent.py", line 34, in push_to_queue
    raise Exception("Error sending the message to the queue: " + format(e))
Exception: Error sending the message to the queue: Send message to publisher error: Channel allocation requires an open connection: <SelectConnection CLOSED socket=None params=<ConnectionParameters host=x port=xvirtual_host=/ ssl=False>>

Я полагаю, что соединение между сервером rabbitmq и клиентом было закрыто.

Как мне этого избежать? Я хотел бы иметь "пожалуйста, всегда поддерживайте соединение". Может быть установка сверхбольшого биения в параметрах подключения Пика? Что-то вроде этого:

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials, heartbeat=6000))

Любые другие кулеры были бы очень признательны.

заранее спасибо


person Avión    schedule 27.05.2019    source источник


Ответы (2)


Я бы посоветовал вам проверять соединение каждый раз перед отправкой сообщения, и если соединение закрыто, просто подключитесь заново.

if not self.connection or self.connection.is_closed:
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials))
    channel = self.connection.channel()
    channel.exchange_declare(exchange=self.exchange_name, exchange_type='fanout')
person Toni Sredanović    schedule 27.05.2019

Вы можете попробовать добавить heartbeat в свой ConnectionParameters. Это создаст легкий трафик, посылая биения каждые указанные секунды. Это будет упражнять связи. Некоторые брандмауэры или прокси-серверы очищают неактивные соединения. Даже RabbitMQ имеет тайм-аут для простаивающих соединений.

import pika

# Set the connection parameters to connect to rabbit-server1 on port 5672
# on the / virtual host using the username "guest" and password "guest"
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('rabbit-server1',
                                       5672,
                                       '/',
                                       heartbeat=60,
                                       credentials)

Документацию по pika см. здесь.

Кроме того, у вас должен быть код, предотвращающий отключение от сети. Это всегда может случиться и будет. Таким образом, сразу после пульса есть некоторая обработка исключений, готовая повторно открывать закрытые соединения изящным способом.

person itsafire    schedule 02.07.2019
comment
Это ошибка. Аргумент heartbeat просто сообщает серверу RabbitMQ, что сервер RabbitMQ должен поддерживать соединение до указанного значения (в секундах), начиная с момента последнего запроса от клиента. Клиент должен периодически отправлять запросы пульса (период должен быть меньше, чем указано в опции пульса). Пика не делает этого автоматически. - person Vladimir Chub; 19.03.2020