У меня есть поток данных в реальном времени с использованием requests
:
request = requests.get($URL,stream=True)
stream = request.iter_lines()
Мы можем предположить, что next(stream)
возвращает что-то вроде
"alice"
"charlie"
"bob"
"charlie"
...
Каждые n
секунд я хочу собирать все данные. Затем я хочу использовать data
в качестве параметра для некоторого алгоритма foo
, который занимает ненулевое время. Предположим, что foo
является отдельным и никак не влияет на stream
.
Если бы foo
выполнялось тривиально, или если бы меня не заботили отсутствующие входные данные из stream
, это было бы легко:
while True:
time0 = time.time() + n
data = {}
for name in stream:
data[name] = data.get(name,0) + 1
if time.time() > time0:
break
foo(data) #could take a while
Но я не хочу пропустить ни одной записи stream
. Я также хочу предотвратить ошибки, если foo
займет больше n
секунд. Как мне этого добиться? Я предполагаю, что мне нужно использовать concurrent.futures
, но, возможно, это возможно с помощью другого метода.
Редактировать: временное разрешение stream
очень маленькое, с десятками выходов в секунду. Это отличает мою проблему от аналогичных вопросов, которые предполагают более грубое разрешение, скажем, 1 вывод в секунду.
foo
занимает большеn
секунд, вы облажались. Независимо от того, как вы его обрабатываете, вы будете отставать все дальше и дальше, пока у Python не закончится память. Одним из возможных решений является размещение вашегоrequests
материала в другом потоке и передача его вывода вqueue
, из которого считывается ваш цикл обработки. Помните, однако, что еслиfoo
является чистым кодом Python, PIL будет мешать веб-запросам. - person Tim Roberts   schedule 12.05.2021foo
занимает больше n секунд. Как мне настроить очередь, чтобы эти две вещи произошли? - person ant11   schedule 12.05.2021Queue.put
, чтобы добавить что-то в очередь, иQueue.get
, чтобы что-то удалить. Любой объект. Это гарантированно потокобезопасно.Queue.get
будет блокироваться, если там ничего нет, так что вы тоже получите синхронизацию. - person Tim Roberts   schedule 12.05.2021