Из документации Akka-stream видно, что все параметры слияния потоков (merge, mergeSorted, mergePreferred, zipN, zipWithN) работают, ожидая, когда все объединенные потоки будут готовы к новому элементу, а затем применяя стратегию слияния (объединение элементов в кортеж, или применяя функцию zip и т. д.)
Это хорошо подходит для автономной обработки (например, чтения данных из файлов или HTTP и их объединения), но приводит к задержке при онлайн-обработке. Мне нужно объединить потоки данных, созданные, например. несколько соединений Websocket и доставлять обновления в объединенном потоке, как только любой из исходных потоков создает значение. Пример: если есть исходные потоки A и B, вот что должно быть в объединенном потоке:
Выходной поток начинается с некоторого начального значения, например. (None, None)
.
(A:1) (B:<not ready>) -> (Some(1), None)
(A:2) (B:<not ready>) -> (Some(2), None)
(A:3) (B:1) -> (Some(3), Some(1))
(A:3) (B:2) -> (Some(3), Some(2))
и т. д. Опять же, новое значение появляется в выходном потоке, когда любой исходный поток создает значение немедленно.
Есть ли какой-нибудь комбинатор для достижения этого?