Аналогичный вопрос (но ответ у меня не работает): Как отменить длительные подпроцессы, выполняющиеся с помощью concurrent.futures.ProcessPoolExecutor?
В отличие от связанного выше вопроса и предоставленного решения, в моем случае само вычисление довольно длительное (связано с процессором) и не может быть выполнено в цикле, чтобы проверить, произошло ли какое-либо событие.
Уменьшенная версия кода ниже:
import asyncio
import concurrent.futures as futures
import time
class Simulator:
def __init__(self):
self._loop = None
self._lmz_executor = None
self._tasks = []
self._max_execution_time = time.monotonic() + 60
self._long_running_tasks = []
def initialise(self):
# Initialise the main asyncio loop
self._loop = asyncio.get_event_loop()
self._loop.set_default_executor(
futures.ThreadPoolExecutor(max_workers=3))
# Run separate processes of long computation task
self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)
def run(self):
self._tasks.extend(
[self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
)
try:
# Gather bot reasoner tasks
_reasoner_tasks = asyncio.gather(*self._tasks)
# Send the reasoner tasks to main monitor task
asyncio.gather(self.sample_main_loop(_reasoner_tasks))
self._loop.run_forever()
except KeyboardInterrupt:
pass
finally:
self._loop.close()
async def sample_main_loop(self, reasoner_tasks):
"""This is the main monitor task"""
await asyncio.wait_for(reasoner_tasks, None)
for task in self._long_running_tasks:
try:
await asyncio.wait_for(task, 10)
except asyncio.TimeoutError:
print("Oops. Some long operation timed out.")
task.cancel() # Doesn't cancel and has no effect
task.set_result(None) # Doesn't seem to have an effect
self._lmz_executor.shutdown()
self._loop.stop()
print('And now I am done. Yay!')
async def bot_reasoning_loop(self, bot):
import math
_exec_count = 0
_sleepy_time = 15
_max_runs = math.floor(self._max_execution_time / _sleepy_time)
self._long_running_tasks.append(
self._loop.run_in_executor(
self._lmz_executor, really_long_process, _sleepy_time))
while time.monotonic() < self._max_execution_time:
print("Bot#{}: thinking for {}s. Run {}/{}".format(
bot, _sleepy_time, _exec_count, _max_runs))
await asyncio.sleep(_sleepy_time)
_exec_count += 1
print("Bot#{} Finished Thinking".format(bot))
def really_long_process(sleepy_time):
print("I am a really long computation.....")
_large_val = 9729379273492397293479237492734 ** 344323
print("I finally computed this large value: {}".format(_large_val))
if __name__ == "__main__":
sim = Simulator()
sim.initialise()
sim.run()
Идея состоит в том, что существует основной цикл моделирования, который запускает и контролирует три потока бота. Затем каждый из этих потоков ботов выполняет некоторые рассуждения, но также запускает действительно длительный фоновый процесс с использованием ProcessPoolExecutor
, который может в конечном итоге работать дольше своего собственного порогового / максимального времени выполнения для размышлений о вещах.
Как вы можете видеть в приведенном выше коде, я попытался .cancel()
выполнить эти задачи, когда истекло время ожидания. Хотя это на самом деле не отменяет фактическое вычисление, которое продолжает происходить в фоновом режиме, и цикл asyncio
не завершается до тех пор, пока не завершатся все длительные вычисления.
Как мне завершить такие длительные вычисления, связанные с процессором, внутри метода?
Другие похожие вопросы SO, но не обязательно связанные или полезные:
- asyncio: возможно ли отменить будущее, запущенное Исполнителем?
- Как завершить одиночная асинхронная задача в многопроцессорной обработке, если эта единственная асинхронная задача превышает пороговое время в Python
- Асинхронная многопроцессорная обработка с рабочим пулом в Python: как продолжить работу после тайм-аута?