asyncio - сколько сопрограмм?

Я уже несколько дней борюсь с приложением python, где я ожидаю искать файл или файлы в папке и перебирать каждый файл и каждую запись в нем и создавать объекты, которые будут сохраняться в базе данных Janusgraph. Конкретный OGM, который я использую, требует, чтобы транзакции с базой данных выполнялись асинхронно с использованием asyncio. Я прочитал много блогов, сообщений об asyncio, и я думаю, что понимаю концепцию async, await, задач и т.д ... В моем приложении я определил несколько функций, которые обрабатывают разные части обработки:

  • Получает список всех доступных файлов
  • Выберите один файл для обработки
  • Обходит выбранный файл и читает строку / запись для обработки
  • Получает запись, определяет, анализирует входящие и вызывает несколько других функций, которые отвечают за создание объектов модели, прежде чем они будут сохранены в базе данных. Например, я создаю разные функции: User, Session, Browser, DeviceUsed, Server и т. Д.

Я понимаю (и могу ошибаться), что большое преимущество использования asyncio заключается в ситуациях, когда вызов функции обычно блокируется для ввода-вывода, транзакции базы данных, задержки сети и т. Д.

Итак, мой вопрос: нужно ли мне преобразовать все мои функции в сопрограммы и расписание для выполнения цикла событий или только те, которые будут блокироваться, например, фиксация транзакции в базе данных. Я пробовал этот подход с самого начала, и у меня возникли всевозможные проблемы.


person Cracoras    schedule 04.04.2018    source источник
comment
Если у вас есть существующая синхронная кодовая база, вы обычно можете преобразовать ее, преобразовав все функции, которые выполняют блокирующие вызовы, в асинхронные, а затем проследить цепочку вызывающих до самого верха, чтобы каждая функция, которая хотя бы иногда прямо или косвенно вызывает что-то асинхронное, теперь асинхронный. Но если вы работаете с нуля, обычно лучше подумать об этих асинхронных цепочках, прежде чем что-либо писать.   -  person abarnert    schedule 04.04.2018


Ответы (1)


Итак, мой вопрос в том, нужно ли мне преобразовать все мои функции в сопрограммы и расписание для выполнения цикла событий или только те, которые будут блокировать,

Возможно, вам придется преобразовать большинство из них, но преобразование должно быть в основном механическим, сводящимся к изменению def на async def и добавлению await при вызове других сопрограмм.

Очевидно, вы не можете избежать преобразования тех, которые фактически блокируют, либо переключившись на соответствующий asyncio API, либо используя _ 4_ для тех, у кого его нет. (Разрешение DNS было выдающимся примером последнего.)

Но тогда вам также необходимо преобразовать их вызывающие, потому что вызов сопрограммы из функции блокировки бесполезен, если функция не реализует функциональность, подобную циклу событий. С другой стороны, когда сопрограмма вызывается из другой сопрограммы, все работает, потому что приостановки автоматически распространяются на вершину цепочки. Когда вся цепочка вызовов состоит из сопрограмм, сопрограммы верхнего уровня передаются в цикл событий с использованием _ 5_ или _ 6_.

Конечно, вспомогательные функции, которые не блокируют и не вызывают блокирующие функции, могут безопасно оставаться неасинхронными и вызываются либо синхронизирующим, либо асинхронным кодом без каких-либо различий.


Вышесказанное относится к asyncio, который реализует бесстековые сопрограммы. Другой подход используется greenlet, задачи которого инкапсулируют стек вызовов, что позволяет им переключаться в произвольных местах кода, который использует обычные вызовы функций. Тем не менее, Greenlets немного тяжелее и менее портативны, чем сопрограммы, поэтому сначала я бы преобразовал их в asyncio.

person user4815162342    schedule 05.04.2018
comment
Несколько дополнительных вопросов, на которые вы ответите: (1) У меня есть основная сопрограмма, которая будет в верхней части цепочки, где я читаю записи, я предполагаю, что ее нужно передать в цикл событий. Эта функция, в свою очередь, будет вызывать цепочки для других, которые просто манипулируют данными в памяти или создают объекты. Если я правильно понял ваше утверждение, мне не нужно вызывать последующие с помощью await? Можно ли называть их обычными функциями? (2) Я также предполагаю, что могу кормить верхнюю сопрограмму с помощью asyncio.ensure_future (), верно? - person Cracoras; 05.04.2018
comment
@Cracoras (1) Точно, если вы вызываете простые функции, которые выполняют что-то в памяти, их не нужно преобразовывать в сопрограммы и, следовательно, не нужно ждать - их можно вызывать как обычные функции. (2) Да; для сопрограммы asyncio.ensure_future и loop.create_task эквивалентны, но последний является предполагаемым API, когда известно, что аргумент является сопрограммой. Вы можете думать о create_taskensure_future) как о запуске задачи в фоновом режиме. Также посмотрите asyncio.gather. - person user4815162342; 05.04.2018
comment
Я наконец закончил рефакторинг своего кода и вернул все внутренние функции к обычным функциям. Однако я получаю следующую ошибку: file_stats, db_session = process_function (message_types.index (row [0]), row, file_stats, db_session) TypeError: объект 'coroutine' не повторяется. Эта строка находится внутри последней сопрограммы, которую я вызывается в моем стеке и запускает все оставшиеся функции, связанные с памятью, о которых я упоминал. - person Cracoras; 11.04.2018
comment
@Cracoras Вам нужно await сопрограмму. Если он находится на самом верху цепочки вызовов, вам нужно вызвать его с помощью loop.run_until_complete. - person user4815162342; 11.04.2018
comment
В данном случае process_function () не является сопрограммой, поэтому я ее не жду. Нужно ли мне ? Вызов к нему находится в сопрограмме внизу стека под названием: process_rows (): task_process_rows = asyncio.ensure_future (process_rows (row, file_name, file_stats, db_session)) file_stats = await task_process_rows ‹/br› ‹/br› async def process_rows (строка, имя_файла, file_stats, db_session): ... file_stats, db_session = process_function (message_types.index (row [0]), row, file_stats, db_session) await db_session.flush () - person Cracoras; 11.04.2018
comment
@Cracoras Я думаю, process_function должна быть сопрограммой, потому что она вызывает другие сопрограммы. Но очень сложно понять, что происходит, не видя кода. Можете ли вы создать минимальный воспроизводимый пример, который все еще демонстрирует проблему? - person user4815162342; 11.04.2018
comment
Я пытался вырезать код таким образом, чтобы это имело смысл, но, думаю, это все равно не помогло. Но я проверил его в git, и вот полный код: github .com / marciodebarros / useractivitylogs / blob / master / app /. Ошибка возникает в строке 195, которая является вызовом последней сопрограммы. Оттуда все остальные вызовы относятся к обычным функциям, а не к какому-либо вводу-выводу. Я даже попытался сделать process_function сопрограммой, но получил аналогичную ошибку. Большое спасибо за ваше терпение и вашу помощь. --МД - person Cracoras; 12.04.2018
comment
@Cracoras Проблема в том, что process_function иногда вызывает асинхронные функции, и их нужно ждать. Другими словами, хотя вы можете смешивать обычные и асинхронные функции, асинхронные всегда нужно ждать, поэтому, если вы выполняете отправку, которая проходит через один путь кода, process_function делает, все его вызываемые объекты должны быть асинхронными, и вам нужно жду их. В качестве быстрого исправления вы можете использовать iscoroutine, чтобы увидеть, нужно ли ждать результата, вот так. - person user4815162342; 13.04.2018
comment
Привет, @ user4815162342, после вашего последнего комментария я снова вернулся к коду и понял, что вы имели в виду, говоря, что процесс_функции вызывает сопрограммы. Оказалось, что это не были сопрограммы, а обычные функции. Я вернул их обратно и смог запустить код без каких-либо проблем / исключений. Я все равно проведу дополнительное тестирование, но похоже, что он работает. Большое вам спасибо за вашу помощь и определенно за ваше терпение. Надеюсь, мне больше не придется беспокоить вас этой проблемой ;-) --MD - person Cracoras; 16.04.2018