Я использовал pika для подключения к RabbitMQ и получения сообщения, как только я запускаю скрипт в среде Ubuntu Prod, он работает, как и ожидалось, но открывает соединение mysql и никогда не закрывает их и заканчивается слишком большим количеством соединений на сервере mysql.
Буду признателен за любую рекомендацию по приведенному ниже коду, так как не могу понять, что происходит не так. Заранее спасибо.
Поток следующий
- Запуск pika на Python3
- Подписывайтесь на канал и ждите сообщений
- В обратном вызове я выполняю различные проверки и сохраняю или обновляю данные внутри MySql.
- Результат, показывающий проблему, - это скриншот из ubuntu htop в конце вопроса, который показывает новое соединение на MySql и продолжает добавлять их вверху.
Версия Пика = 0.13.0
Для MySql я использую pymysql.
Скрипт Пика
def main():
credentials = pika.PlainCredentials(tunnel['queue']['name'], tunnel['queue']['password'])
while True:
try:
cp = pika.ConnectionParameters(
host=tunnel['queue']['host'],
port=tunnel['queue']['port'],
credentials=credentials,
ssl=tunnel['queue']['ssl'],
heartbeat=600,
blocked_connection_timeout=300
)
connection = pika.BlockingConnection(cp)
channel = connection.channel()
def callback(ch, method, properties, body):
if 'messageType' in properties.headers:
message_type = properties.headers['messageType']
if events.get(message_type):
result = Descriptors._reflection.ParseMessage(events[message_type]['decode'], body)
if result:
result = protobuf_to_dict(result)
model.write_response(external_response=result, message_type=message_type)
else:
app_log.warning('Message type not in allowed list = ' + str(message_type))
app_log.warning('continue listening...')
channel.basic_consume(callback, queue=tunnel['queue']['name'], no_ack=True)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
break
except pika.connection.exceptions.ConnectionClosed as e:
app_log.error('ConnectionClosed :: %s' % str(e))
continue
except pika.connection.exceptions.AMQPChannelError as e:
app_log.error('AMQPChannelError :: %s' % str(e))
continue
except Exception as e:
app_log.error('Connection was closed, retrying... %s' % str(e))
continue
if __name__ == '__main__':
main()
Внутри скрипта у меня есть модель, которая вставляет или обновляет базу данных, код ниже
def write_response(self, external_response, message_type):
table_name = events[message_type]['table_name']
original_response = external_response[events[message_type]['response']]
if isinstance(original_response, list):
external_response = []
for o in original_response:
record = self.map_keys(o, message_type, events[message_type].get('values_fix', {}))
external_response.append(self.validate_fields(record))
else:
external_response = self.map_keys(original_response, message_type, events[message_type].get('values_fix', {}))
external_response = self.validate_fields(external_response)
if not self.mysql.open:
self.mysql.ping(reconnect=True)
with self.mysql.cursor() as cursor:
if isinstance(original_response, list):
for e in external_response:
id_name = events[message_type]['id_name']
filters = {id_name: e[id_name]}
self.event(
cursor=cursor,
table_name=table_name,
filters=filters,
external_response=e,
message_type=message_type,
event_id=e[id_name],
original_response=e # not required here
)
else:
id_name = events[message_type]['id_name']
filters = {id_name: external_response[id_name]}
self.event(
cursor=cursor,
table_name=table_name,
filters=filters,
external_response=external_response,
message_type=message_type,
event_id=external_response[id_name],
original_response=original_response
)
cursor.close()
self.mysql.close()
return
В Ubuntu я использую systemd для запуска скрипта и перезапуска в случае, если что-то пойдет не так, ниже файл systemd
[Unit]
Description=Pika Script
Requires=stunnel4.service
Requires=mysql.service
Requires=mongod.service
[Service]
User=user
Group=group
WorkingDirectory=/home/pika_script
ExecStart=/home/user/venv/bin/python pika_script.py
Restart=always
[Install]
WantedBy=multi-user.target
Изображение из ubuntu htop, как MySql продолжает добавлять в список и никогда не закрывает его
Ошибка
tornado_mysql.err.OperationalError: (1040, 'Too many connections')