У меня есть упорядоченный поток данных
A A A B B C C C C ... (very long)
И я хочу преобразовать его в поток агрегатов в виде (элемент, количество):
(A, 3) (B, 2) (C, 4)
Какие операторы я могу использовать для этого в Akka Streams?
Source.fromPublisher(publisher)
.aggregateSomehow() // ?
.runWith(sink)
Я изучил .groupBy, но для этого требуется, чтобы я заранее знал количество категорий, чего я не знаю. Также я считаю, что он сохранит в памяти все группы, которых я хотел бы избежать. Я должен иметь возможность отказаться от (A, 3) после того, как он был обработан, и освободить ресурсы, которые он потребляет.
Изменить: Этот вопрос требует аналогичных функций, но с использованием подпотоков. Однако использование SubFlows, похоже, не требуется, потому что у меня есть решение, использующее комбинатор statefulMapConcat
.