Использование запросов Python и, возможно, concurrent.futures для обработки данных в реальном времени.

У меня есть поток данных в реальном времени с использованием requests:

request = requests.get($URL,stream=True)
stream = request.iter_lines()

Мы можем предположить, что next(stream) возвращает что-то вроде

"alice"
"charlie"
"bob"
"charlie"
...

Каждые n секунд я хочу собирать все данные. Затем я хочу использовать data в качестве параметра для некоторого алгоритма foo, который занимает ненулевое время. Предположим, что foo является отдельным и никак не влияет на stream.

Если бы foo выполнялось тривиально, или если бы меня не заботили отсутствующие входные данные из stream, это было бы легко:

while True:
    time0 = time.time() + n
    data = {}
    for name in stream:
        data[name] = data.get(name,0) + 1
        if time.time() > time0:
            break
    foo(data) #could take a while

Но я не хочу пропустить ни одной записи stream. Я также хочу предотвратить ошибки, если foo займет больше n секунд. Как мне этого добиться? Я предполагаю, что мне нужно использовать concurrent.futures, но, возможно, это возможно с помощью другого метода.

Редактировать: временное разрешение stream очень маленькое, с десятками выходов в секунду. Это отличает мою проблему от аналогичных вопросов, которые предполагают более грубое разрешение, скажем, 1 вывод в секунду.


person ant11    schedule 05.05.2021    source источник
comment
Если foo занимает больше n секунд, вы облажались. Независимо от того, как вы его обрабатываете, вы будете отставать все дальше и дальше, пока у Python не закончится память. Одним из возможных решений является размещение вашего requests материала в другом потоке и передача его вывода в queue, из которого считывается ваш цикл обработки. Помните, однако, что если foo является чистым кодом Python, PIL будет мешать веб-запросам.   -  person Tim Roberts    schedule 12.05.2021
comment
В настоящее время меня не слишком беспокоит случай, когда foo занимает больше n секунд. Как мне настроить очередь, чтобы эти две вещи произошли?   -  person ant11    schedule 12.05.2021
comment
Вы используете Queue.put, чтобы добавить что-то в очередь, и Queue.get, чтобы что-то удалить. Любой объект. Это гарантированно потокобезопасно. Queue.get будет блокироваться, если там ничего нет, так что вы тоже получите синхронизацию.   -  person Tim Roberts    schedule 12.05.2021
comment
Я попытался адаптировать два метода из этого сообщения: stackoverflow.com/questions/41648103 Однако они, похоже, не работают, так как мои задачи зависят от набора/объединения выходных данных, а не от одного. Я не уверен, какие модификации сделать.   -  person ant11    schedule 12.05.2021
comment
Ну, если вам надо привести результаты в порядок, то неясно, сколько распараллеливания вы реально сможете сделать. Вы можете извлечь результаты из очереди и отсортировать их по корзинам, а когда у вас будет полная коллекция, обработать ее. Может быть.   -  person Tim Roberts    schedule 12.05.2021