Python3 pika channel.basic_consume() вызывает слишком много подключений к MySQL

Я использовал pika для подключения к RabbitMQ и получения сообщения, как только я запускаю скрипт в среде Ubuntu Prod, он работает, как и ожидалось, но открывает соединение mysql и никогда не закрывает их и заканчивается слишком большим количеством соединений на сервере mysql.

Буду признателен за любую рекомендацию по приведенному ниже коду, так как не могу понять, что происходит не так. Заранее спасибо.

Поток следующий

  1. Запуск pika на Python3
  2. Подписывайтесь на канал и ждите сообщений
  3. В обратном вызове я выполняю различные проверки и сохраняю или обновляю данные внутри MySql.
  4. Результат, показывающий проблему, - это скриншот из ubuntu htop в конце вопроса, который показывает новое соединение на MySql и продолжает добавлять их вверху.

Версия Пика = 0.13.0

Для MySql я использую pymysql.

Скрипт Пика

def main():
    credentials = pika.PlainCredentials(tunnel['queue']['name'], tunnel['queue']['password'])

    while True:
        try:
            cp = pika.ConnectionParameters(
                host=tunnel['queue']['host'],
                port=tunnel['queue']['port'],
                credentials=credentials,
                ssl=tunnel['queue']['ssl'],
                heartbeat=600,
                blocked_connection_timeout=300
            )

            connection = pika.BlockingConnection(cp)
            channel = connection.channel()

            def callback(ch, method, properties, body):
                if 'messageType' in properties.headers:
                    message_type = properties.headers['messageType']

                    if events.get(message_type):
                        result = Descriptors._reflection.ParseMessage(events[message_type]['decode'], body)
                        if result:
                            result = protobuf_to_dict(result)
                            model.write_response(external_response=result, message_type=message_type)
                    else:
                        app_log.warning('Message type not in allowed list = ' + str(message_type))
                        app_log.warning('continue listening...')

            channel.basic_consume(callback, queue=tunnel['queue']['name'], no_ack=True)
            try:
                channel.start_consuming()
            except KeyboardInterrupt:
                channel.stop_consuming()
                connection.close()
                break
        except pika.connection.exceptions.ConnectionClosed as e:
            app_log.error('ConnectionClosed :: %s' % str(e))
            continue
        except pika.connection.exceptions.AMQPChannelError as e:
            app_log.error('AMQPChannelError :: %s' % str(e))
            continue
        except Exception as e:
            app_log.error('Connection was closed, retrying... %s' % str(e))
            continue


if __name__ == '__main__':
    main()

Внутри скрипта у меня есть модель, которая вставляет или обновляет базу данных, код ниже

def write_response(self, external_response, message_type):
    table_name = events[message_type]['table_name']
    original_response = external_response[events[message_type]['response']]
    if isinstance(original_response, list):
        external_response = []
        for o in original_response:
            record = self.map_keys(o, message_type, events[message_type].get('values_fix', {}))
            external_response.append(self.validate_fields(record))
    else:
        external_response = self.map_keys(original_response, message_type, events[message_type].get('values_fix', {}))
        external_response = self.validate_fields(external_response)

    if not self.mysql.open:
        self.mysql.ping(reconnect=True)

    with self.mysql.cursor() as cursor:
        if isinstance(original_response, list):
            for e in external_response:
                id_name = events[message_type]['id_name']
                filters = {id_name: e[id_name]}
                self.event(
                    cursor=cursor,
                    table_name=table_name,
                    filters=filters,
                    external_response=e,
                    message_type=message_type,
                    event_id=e[id_name],
                    original_response=e  # not required here
                )
        else:
            id_name = events[message_type]['id_name']
            filters = {id_name: external_response[id_name]}
            self.event(
                cursor=cursor,
                table_name=table_name,
                filters=filters,
                external_response=external_response,
                message_type=message_type,
                event_id=external_response[id_name],
                original_response=original_response
            )
    cursor.close()
    self.mysql.close()

    return

В Ubuntu я использую systemd для запуска скрипта и перезапуска в случае, если что-то пойдет не так, ниже файл systemd

[Unit]
Description=Pika Script
Requires=stunnel4.service
Requires=mysql.service
Requires=mongod.service

[Service]
User=user
Group=group
WorkingDirectory=/home/pika_script
ExecStart=/home/user/venv/bin/python pika_script.py
Restart=always

[Install]
WantedBy=multi-user.target

Изображение из ubuntu htop, как MySql продолжает добавлять в список и никогда не закрывает его введите здесь описание изображения

Ошибка

tornado_mysql.err.OperationalError: (1040, 'Too many connections')

person aaa    schedule 03.07.2020    source источник


Ответы (3)


Я бы сказал, что увеличение соединения может решить вашу проблему умеренно.

1-й выяснить, почему приложение не закрывает соединение после завершения задачи.

2. Любые медленные запросы/вызовы в БД и исправить их, если таковые имеются.

3-й, учитывая отсутствие медленных запросов/вызовов в БД, а также приложение закрывает соединение/поток после немедленного завершения задачи, а затем подумайте об игре с wait_timeout на стороне mysql.

person Vaibhav    schedule 12.07.2020

Согласно этому ответу, если у вас есть MySQL 5.7 и 5.8:

Стоит знать, что если у вас закончится доступное дисковое пространство на вашем серверном разделе или диске, это также приведет к тому, что MySQL вернет эту ошибку. Если вы уверены, что это не фактическое количество подключенных пользователей, то следующим шагом будет проверка наличия свободного места на диске/разделе вашего сервера MySQL.

Из той же ветки. Вы можете проверить и увеличить количество соединений MySQL.

person rok    schedule 06.07.2020
comment
увеличить количество подключений не является решением, так как при мониторинге я мгновенно вижу каждые 10 секунд новое подключение - person aaa; 06.07.2020
comment
вы говорите о выходе htop? это не mysl клиентские новые подключения. это mysqld несколько процессов на сервере. у вас проблемы и каждый раз перезапускаете mysqld? - person rok; 07.07.2020
comment
да, я перезапускаю его каждый раз, потому что слишком много процессов и через некоторое время все вылетает - person aaa; 07.07.2020

Я нашел проблему, отправив сообщение, если это поможет кому-то еще.

проблема заключалась в том, что mysqld вошел в бесконечный цикл, пытаясь создать индексацию для определенной базы данных, после того, как обнаружил, какая база данных пыталась создать индексы, и никогда не добивалась успеха, пытаясь снова и снова.

Решение состояло в том, чтобы удалить базу данных и создать ее заново, и процесс mysqld вернулся в нормальное состояние. и бесконечный цикл для создания индексов также исчез.

person aaa    schedule 10.07.2020