Пулы потоков Python - задачи, которые создают подзадачи и ждут их

Скажем, у меня есть исполнитель пула потоков с макс. 10 потоков, и я отправляю ему задачу, которая сама создает другую задачу и, в свою очередь, рекурсивно ожидает ее завершения, пока я не достигну глубины 11.

Пример кода на Python:

import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        f = e.submit(task, depth + 1)
        concurrent.futures.wait([f])


f = e.submit(task, 0)
print f.result()

Приведенный выше код выводит:

started depth 0
started depth 1
started depth 2
started depth 3
started depth 4
started depth 5
started depth 6
started depth 7
started depth 8
started depth 9

и тупики.

Есть ли способ решить эту проблему без создания дополнительных потоков и исполнителей?

Другими словами, как рабочие потоки могут работать над другими задачами во время ожидания?


person saarraz1    schedule 24.05.2015    source источник
comment
stackoverflow.com/questions/1239035/   -  person Tymoteusz Paul    schedule 24.05.2015
comment
Или еще лучше использовать решение высокого уровня, такое как Celery celeryproject.org   -  person Tymoteusz Paul    schedule 24.05.2015
comment
Разве это не тупик, потому что он продолжает вызывать новые потоки, но максимальное количество в пуле составляет всего 10. Ни один из потоков никогда не завершает свою задачу.   -  person Alexander    schedule 24.05.2015
comment
@Alexander - да, но никто из них тоже не работает, они все спят, когда нужно что-то делать. Я ищу способ заставить их перестать ждать и заняться другими делами, чтобы операция могла завершиться.   -  person saarraz1    schedule 24.05.2015
comment
Короткий ответ - нет. По крайней мере, не используя исполнителя. Однако вы можете подумать о рефакторинге кода, чтобы вместо этого использовать сопрограммы. docs.python.org/3/library/asyncio-task.html   -  person Dunes    schedule 24.05.2015
comment
@Dunes Вот и все! coroutines - это именно то средство, которое я искал! Пожалуйста, опубликуйте это как ответ, чтобы я принял его.   -  person saarraz1    schedule 24.05.2015


Ответы (3)


Используя сопрограммы, ваш код можно переписать как:

import asyncio

@asyncio.coroutine
def task(depth):
    print('started depth %d' % (depth, ))
    if depth > 10:
        return depth
    else:
        # create new task
        t = asyncio.async(task(depth + 1))
        # wait for task to complete
        yield from t
        # get the result of the task
        return t.result()

loop = asyncio.get_event_loop()
result = loop.run_until_complete(task(1))
print(result)
loop.close()

Однако я изо всех сил пытаюсь понять, зачем вам нужен весь этот дополнительный код. В вашем примере кода вы всегда ждете непосредственно результата задачи, поэтому ваш код не будет работать без исполнителя. Например, следующий результат даст тот же результат

def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        task(depth + 1)

Я думаю, что этот пример из документации лучше показывает, как асинхронные сопрограммы могут распараллеливать задачи. В этом примере создаются 3 задачи, каждая из которых вычисляет свой факториал. Обратите внимание, как когда каждая задача уступает место другой сопрограмме (в данном случае async.sleep), другой задаче разрешается продолжить выполнение.

import asyncio

@asyncio.coroutine
def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        yield from asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

loop = asyncio.get_event_loop()
tasks = [
    asyncio.ensure_future(factorial("A", 2)),
    asyncio.ensure_future(factorial("B", 3)),
    asyncio.ensure_future(factorial("C", 4))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

Вывод:

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
person Dunes    schedule 24.05.2015

Нет, если вы хотите избежать тупика, вы не можете ждать будущего от того же исполнителя в задаче.

Единственное, что вы можете сделать в этом примере, - это вернуть будущее, а затем рекурсивно обработать результаты:

import concurrent.futures
import time

e = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        f = e.submit(task, depth + 1)
        return f


f = e.submit(task, 0)
while isinstance(f.result(), concurrent.futures.Future):
    f = f.result()

print f.result()

Однако было бы лучше вообще избежать такого рекурсивного выполнения.

person mata    schedule 24.05.2015

То, что вы здесь переживаете, вы уже справедливо назвали тупиком. Первый поток, который запускает следующий поток и ожидает, пока он удерживает lock, на котором все последующие задачи будут заблокированы, ожидая освобождения того же lock (чего никогда не бывает в вашем случае). Я бы посоветовал вам запускать свои собственные потоки в задачах вместо использования пула, например:

import concurrent.futures
import threading


class TaskWrapper(threading.Thread):

    def __init__(self, depth, *args, **kwargs):
        self._depth = depth
        self._result = None
        super(TaskWrapper, self).__init__(*args, **kwargs)

    def run(self):
        self._result = task(self._depth)

    def get(self):
        self.join()
        return self._result

e = concurrent.futures.ThreadPoolExecutor(max_workers=10)


def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        t = TaskWrapper(depth + 1)
        t.start()
        return t.get()

f = e.submit(task, 0)
print f.result()
person sirfz    schedule 24.05.2015