От concurrent.futures к asyncio

У меня две проблемы с concurrent.futures:

Как сломать time.sleep() в python concurrent.futures?

Вывод: time.sleep() нельзя прервать. Одно из решений: вы можете написать вокруг него цикл и делать короткие засыпания.

См. Как разбить time.sleep() в питоне. concurrent.futures

Индивидуальные тайм-ауты для concurrent.futures?

Вывод: индивидуальные тайм-ауты должны быть реализованы пользователем. Например: для каждого тайм-аута вы можете вызывать wait() .

См. Индивидуальные тайм-ауты для concurrent.futures

Вопрос

Решает ли asyncio эти проблемы?


person guettli    schedule 29.07.2016    source источник
comment
Не могли бы вы для самодостаточности резюмировать здесь два других вопроса?   -  person deceze♦    schedule 29.07.2016


Ответы (2)


В асинхронной модели выполнение планируется и координируется циклом обработки событий. Чтобы отменить выполнение приостановленной в данный момент задачи, вам нужно просто не возобновлять ее. Хотя на практике это работает немного иначе, должно быть очевидно, что это делает отмену приостановленной задачи простой в теории.

Индивидуальные тайм-ауты, безусловно, возможны таким же образом: всякий раз, когда вы приостанавливаете сопрограмму для ожидания результата, вы хотите указать значение тайм-аута. Цикл событий гарантирует отмену ожидающей задачи, когда этот тайм-аут достигнут, а задача еще не завершена.

Некоторые образцы бетона:

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> task = asyncio.ensure_future(asyncio.sleep(5))
>>> task.cancel()
>>> loop.run_until_complete(task)
Traceback (most recent call last):
   ...
concurrent.futures._base.CancelledError

На практике это может быть реализовано примерно так:

class Foo:
    task = None

    async def sleeper(self):
        self.task = asyncio.sleep(60)
        try:
            await self.task
        except concurrent.futures.CancelledError:
            raise NotImplementedError

Пока этот метод спит, кто-то другой может вызвать foo.task.cancel(), чтобы разбудить сопрограмму и позволить ей обработать отмену. В качестве альтернативы тот, кто вызывает sleeper(), может отменить это напрямую, не давая ему возможности очиститься.

Установка тайм-аутов также проста:

>>> loop.run_until_complete(asyncio.wait_for(asyncio.sleep(60), 5))
[ ... 5 seconds later ... ]
Traceback (most recent call last):
   ...
concurrent.futures._base.TimeoutError

В частности, в контексте тайм-аутов HTTP-запросов см. http://aiohttp.readthedocs.io:

async def fetch_page(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            assert response.status == 200
            return await response.read()

with aiohttp.ClientSession(loop=loop) as session:
    content = loop.run_until_complete(fetch_page(session, 'http://python.org'))

Очевидно, что каждый вызов fetch_page может выбрать свое собственное значение aiohttp.Timeout, и каждый отдельный экземпляр будет генерировать свое собственное исключение, когда этот тайм-аут будет достигнут.

person deceze♦    schedule 29.07.2016
comment
Вау, отличный ответ. Спасибо. Представьте, что я использую модуль подпроцесса (я читал, что он поддерживается), затем я хочу как-то завершить подпроцесс. У вас есть подсказка, как это сделать? - person guettli; 29.07.2016
comment
Не на самом деле нет. Вы должны открыть новый вопрос для этого. - person deceze♦; 29.07.2016
comment
@guettli Взгляните на модуль asyncio.subprocess и те примеры. - person Vincent; 01.08.2016

Вы можете сделать рейз сразу в его исключении (с asyncio.CancelledError).

Я использую этот метод для улова, чтобы преодолеть его:

import asyncio

async def worker():
    try:
        # await for some coroutine process
    except asyncio.CancelledError:
        # Do stuff
        raise asyncio.CancelledError()
    except Exception as exc:
        # Do stuff
        print(exc)
    finally:
        await asyncio.sleep(2)
person Benyamin Jafari    schedule 06.01.2019