concurrent.futures.ThreadPoolExecutor.map (): тайм-аут не работает

import concurrent.futures
import time 

def process_one(i):
    try:                                                                             
        print("dealing with {}".format(i))                                           
        time.sleep(50)
        print("{} Done.".format(i))                                                  
    except Exception as e:                                                           
        print(e)

def process_many():
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: 
        executor.map(process_one,
                range(100),                                                          
                timeout=3)                                                           


if __name__ == '__main__':                                                           
    MAX_WORKERS = 10
    try:
        process_many()
    except Exception as e:                                                           
        print(e)      

В документах говорится:

Возвращенный итератор вызывает concurrent.futures.TimeoutError, если вызывается __next__() и результат недоступен через timeout секунд после исходного вызова Executor.map()

Но здесь скрипт не вызвал никаких исключений и продолжал ждать. Какие-либо предложения?


person Hao Wang    schedule 19.07.2018    source источник
comment
Вы пытаетесь убить зависшие задания или хотите, чтобы весь вызов process_many занимал ~ 3 секунды или меньше?   -  person arachnivore    schedule 25.07.2018
comment
@arachnivore Убейте задания, которые вешают, и освободите потоки, которые они занимают.   -  person Hao Wang    schedule 25.07.2018
comment
Какая версия Python?   -  person Mr_and_Mrs_D    schedule 27.07.2018


Ответы (2)


Как указано в документации, ошибка тайм-аута будет возникать только в том случае, если вы вызываете метод __next__() на карте. Чтобы вызвать этот метод, вы можете, например, преобразовать вывод в список:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n / 10


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    print('main: starting')
    try:
        # without this conversion to a list, the timeout error is not raised
        real_results = list(results) 
    except futures._base.TimeoutError:
        print("TIMEOUT")

Выход:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-9_0: done with 1
ThreadPoolExecutor-9_1: done with 2
TIMEOUT
ThreadPoolExecutor-9_2: done with 3
ThreadPoolExecutor-9_3: done with 4
ThreadPoolExecutor-9_4: done with 5

Здесь n-я задача бездействует в течение n секунд, поэтому время ожидания увеличивается после завершения задачи 2.


ИЗМЕНИТЬ: если вы хотите завершить задачи, которые не были завершены, вы можете попробовать ответы в этот вопрос (хотя они не используют ThreadPoolExecutor.map()), или вы можете просто проигнорировать возвращаемые значения из других задач и позволить им закончить:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    outputs = []
    try:
        for i in results:
            outputs.append(i)
    except futures._base.TimeoutError:
        print("TIMEOUT")
    print(outputs)

Выход:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-5_0: done with 1
ThreadPoolExecutor-5_1: done with 2
TIMEOUT
[1, 2]
ThreadPoolExecutor-5_2: done with 3
ThreadPoolExecutor-5_3: done with 4
ThreadPoolExecutor-5_4: done with 5
person TrakJohnson    schedule 22.07.2018
comment
задача (n) всегда будет выполняться (печать выполняется с n). Есть ли способ прервать его в случае TimeoutException? Я также попробовал генераторный способ, явно вызвав __next __ (); с тем же результатом. - person Hao Wang; 23.07.2018
comment
@HaoWang Я отредактировал свой ответ, чтобы решить эту проблему. Однако я только что понял, что второе решение работает только в том случае, если ваши задачи соответствуют времени, т.е. если задача после имеет большую задержку - что делает ее очень непрактичной. Я попробую найти что-нибудь еще. - person TrakJohnson; 23.07.2018

comment
В моем случае существует огромный пул URL-адресов, я хотел бы пробовать их (получить содержимое каждой страницы) как можно больше, но не возражаю отказаться от медленного соединения и попробовать следующее. - person Hao Wang; 30.07.2018