Как завершить длительные вычисления (задача, связанная с процессором) в Python с помощью asyncio и concurrent.futures.ProcessPoolExecutor?

Аналогичный вопрос (но ответ у меня не работает): Как отменить длительные подпроцессы, выполняющиеся с помощью concurrent.futures.ProcessPoolExecutor?

В отличие от связанного выше вопроса и предоставленного решения, в моем случае само вычисление довольно длительное (связано с процессором) и не может быть выполнено в цикле, чтобы проверить, произошло ли какое-либо событие.

Уменьшенная версия кода ниже:

import asyncio
import concurrent.futures as futures
import time

class Simulator:
    def __init__(self):
        self._loop = None
        self._lmz_executor = None
        self._tasks = []
        self._max_execution_time = time.monotonic() + 60
        self._long_running_tasks = []

    def initialise(self):
        # Initialise the main asyncio loop
        self._loop = asyncio.get_event_loop()
        self._loop.set_default_executor(
            futures.ThreadPoolExecutor(max_workers=3))

        # Run separate processes of long computation task
        self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)

    def run(self):
        self._tasks.extend(
            [self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
        )

        try:
            # Gather bot reasoner tasks
            _reasoner_tasks = asyncio.gather(*self._tasks)
            # Send the reasoner tasks to main monitor task
            asyncio.gather(self.sample_main_loop(_reasoner_tasks))
            self._loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            self._loop.close()

    async def sample_main_loop(self, reasoner_tasks):
        """This is the main monitor task"""
        await asyncio.wait_for(reasoner_tasks, None)
        for task in self._long_running_tasks:
            try:
                await asyncio.wait_for(task, 10)
            except asyncio.TimeoutError:
                print("Oops. Some long operation timed out.")
                task.cancel()  # Doesn't cancel and has no effect
                task.set_result(None)  # Doesn't seem to have an effect

        self._lmz_executor.shutdown()
        self._loop.stop()
        print('And now I am done. Yay!')

    async def bot_reasoning_loop(self, bot):
        import math

        _exec_count = 0
        _sleepy_time = 15
        _max_runs = math.floor(self._max_execution_time / _sleepy_time)

        self._long_running_tasks.append(
            self._loop.run_in_executor(
                    self._lmz_executor, really_long_process, _sleepy_time))

        while time.monotonic() < self._max_execution_time:
            print("Bot#{}: thinking for {}s. Run {}/{}".format(
                    bot, _sleepy_time, _exec_count, _max_runs))
            await asyncio.sleep(_sleepy_time)
            _exec_count += 1

        print("Bot#{} Finished Thinking".format(bot))

def really_long_process(sleepy_time):
    print("I am a really long computation.....")
    _large_val = 9729379273492397293479237492734 ** 344323
    print("I finally computed this large value: {}".format(_large_val))

if __name__ == "__main__":
    sim = Simulator()
    sim.initialise()
    sim.run()

Идея состоит в том, что существует основной цикл моделирования, который запускает и контролирует три потока бота. Затем каждый из этих потоков ботов выполняет некоторые рассуждения, но также запускает действительно длительный фоновый процесс с использованием ProcessPoolExecutor, который может в конечном итоге работать дольше своего собственного порогового / максимального времени выполнения для размышлений о вещах.

Как вы можете видеть в приведенном выше коде, я попытался .cancel() выполнить эти задачи, когда истекло время ожидания. Хотя это на самом деле не отменяет фактическое вычисление, которое продолжает происходить в фоновом режиме, и цикл asyncio не завершается до тех пор, пока не завершатся все длительные вычисления.

Как мне завершить такие длительные вычисления, связанные с процессором, внутри метода?

Другие похожие вопросы SO, но не обязательно связанные или полезные:

  1. asyncio: возможно ли отменить будущее, запущенное Исполнителем?
  2. Как завершить одиночная асинхронная задача в многопроцессорной обработке, если эта единственная асинхронная задача превышает пороговое время в Python
  3. Асинхронная многопроцессорная обработка с рабочим пулом в Python: как продолжить работу после тайм-аута?

person Darkfish    schedule 22.10.2018    source источник


Ответы (1)


Как мне завершить такие длительные вычисления, связанные с процессором, внутри метода?

Подход, который вы пробовали, не работает, потому что фьючерсы, возвращенные ProcessPoolExecutor не подлежат отмене. Хотя run_in_executor пытается распространить отмену он просто когда-то игнорируется Future.cancel задача начинает выполняться.

Для этого нет фундаментальной причины. В отличие от потоков, процессы могут быть безопасно завершены, поэтому ProcessPoolExecutor.submit вполне может вернуть будущее, cancel которое завершило соответствующий процесс. Сопрограммы asyncio определили семантику отмены и будут автоматически использовать ее. К сожалению, ProcessPoolExecutor.submit возвращает обычный concurrent.futures.Future, предполагающий наименьший общий знаменатель и рассматривает бегущее будущее как неприкосновенное.

В результате, чтобы отменить задачи, выполняемые в подпроцессах, нужно полностью обойти ProcessPoolExecutor и управлять собственными процессами. Проблема в том, как это сделать, не выполняя повторно половину multiprocessing. Один из вариантов, предлагаемых стандартной библиотекой, - (ab) использовать multiprocessing.Pool для этой цели, потому что он поддерживает надежное завершение рабочих процессов. CancellablePool может работать следующим образом:

  • Вместо того, чтобы порождать фиксированное количество процессов, создайте фиксированное количество пулов с 1 рабочим.
  • Назначьте задачи пулам из сопрограммы asyncio. Если сопрограмма отменяется в ожидании завершения задачи в другом процессе, завершить однопроцессный пул и создать новый.
  • Поскольку все координируется из одного потока asyncio, не беспокойтесь об условиях гонки, таких как случайное завершение процесса, который уже начал выполнение другой задачи. (Этого нужно было бы предотвратить, если бы можно было поддерживать отмену в ProcessPoolExecutor.)

Вот пример реализации этой идеи:

import asyncio
import multiprocessing

class CancellablePool:
    def __init__(self, max_workers=3):
        self._free = {self._new_pool() for _ in range(max_workers)}
        self._working = set()
        self._change = asyncio.Event()

    def _new_pool(self):
        return multiprocessing.Pool(1)

    async def apply(self, fn, *args):
        """
        Like multiprocessing.Pool.apply_async, but:
         * is an asyncio coroutine
         * terminates the process if cancelled
        """
        while not self._free:
            await self._change.wait()
            self._change.clear()
        pool = usable_pool = self._free.pop()
        self._working.add(pool)

        loop = asyncio.get_event_loop()
        fut = loop.create_future()
        def _on_done(obj):
            loop.call_soon_threadsafe(fut.set_result, obj)
        def _on_err(err):
            loop.call_soon_threadsafe(fut.set_exception, err)
        pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)

        try:
            return await fut
        except asyncio.CancelledError:
            pool.terminate()
            usable_pool = self._new_pool()
        finally:
            self._working.remove(pool)
            self._free.add(usable_pool)
            self._change.set()

    def shutdown(self):
        for p in self._working | self._free:
            p.terminate()
        self._free.clear()

Минималистичный тестовый пример, показывающий отмену:

def really_long_process():
    print("I am a really long computation.....")
    large_val = 9729379273492397293479237492734 ** 344323
    print("I finally computed this large value: {}".format(large_val))

async def main():
    loop = asyncio.get_event_loop()
    pool = CancellablePool()

    tasks = [loop.create_task(pool.apply(really_long_process))
             for _ in range(5)]
    for t in tasks:
        try:
            await asyncio.wait_for(t, 1)
        except asyncio.TimeoutError:
            print('task timed out and cancelled')
    pool.shutdown()

asyncio.get_event_loop().run_until_complete(main())

Обратите внимание на то, что использование ЦП никогда не превышает 3 ядер и как оно начинает снижаться ближе к концу теста, указывая на то, что процессы завершаются, как и ожидалось.

Чтобы применить его к коду из вопроса, сделайте self._lmz_executor экземпляром CancellablePool и измените self._loop.run_in_executor(...) на self._loop.create_task(self._lmz_executor.apply(...)).

person user4815162342    schedule 22.10.2018
comment
Это замечательно и работает именно так, как я хотел. Жаль, что библиотека asyncio на самом деле не завершает процессы, запущенные в ProcessPoolExecutor. Это имеет смысл для ThreadPoolExecutor. Возможно, я оставлю такое обсуждение для списка рассылки Python. Спасибо. - person Darkfish; 24.10.2018
comment
@Darkfish Я думаю, что это будет стоящее предложение для трекера ошибок Python. (Хотя на самом деле это может быть проигнорировано, если оно не сопровождается патчем, реализующим его.) Также будьте осторожны перед использованием tihs в продакшене. Моя первая попытка состояла в том, чтобы использовать ProcessPoolExecutor в качестве базового пула, но этот класс был очень недоволен, когда кто-то завершил пул с отложенными задачами - его поток управления начал вызывать неверный файловый дескриптор и аналогичные исключения, и они не могли не быть пойманным. multiprocessing справляется с этим изящно, но я не уверен, что кто-то на самом деле это проверяет. - person user4815162342; 24.10.2018
comment
Хорошо знать. На данный момент я реализую только исследовательский прототип, поэтому, полагаю, мне придется тщательно протестировать его перед выпуском. Возможно, когда я закончу с этим, я займусь исправлением или, по крайней мере, начну обсуждение списка Python. Спасибо друг. - person Darkfish; 25.10.2018