Потоки Akka уменьшаются до меньшего потока

У меня есть упорядоченный поток данных

A A A B B C C C C ... (very long)

И я хочу преобразовать его в поток агрегатов в виде (элемент, количество):

(A, 3) (B, 2) (C, 4)

Какие операторы я могу использовать для этого в Akka Streams?

Source.fromPublisher(publisher)
    .aggregateSomehow()  // ?
    .runWith(sink)

Я изучил .groupBy, но для этого требуется, чтобы я заранее знал количество категорий, чего я не знаю. Также я считаю, что он сохранит в памяти все группы, которых я хотел бы избежать. Я должен иметь возможность отказаться от (A, 3) после того, как он был обработан, и освободить ресурсы, которые он потребляет.

Изменить: Этот вопрос требует аналогичных функций, но с использованием подпотоков. Однако использование SubFlows, похоже, не требуется, потому что у меня есть решение, использующее комбинатор statefulMapConcat.


person Andrejs    schedule 10.09.2017    source источник
comment
comment
@chunjef Спасибо за указатель! Это, безусловно, связано, хотя не уверен, что это дубликат. Кажется, SubFlows не обязательно требуются.   -  person Andrejs    schedule 10.09.2017


Ответы (1)


Одним из вариантов является использование statefulMapConcat:

Source(List("A", "A", "B", "B", "B", "C", "C", ""))
      .statefulMapConcat({ () =>
        var lastChar = ""
        var count = 0

        char => if(lastChar == char) {
            count += 1
            List.empty
          } else {
            val charCount = (lastChar, count)
            lastChar = char
            count = 1
            List(charCount)
          }
      })
    .runForeach(println)

Однако для этого требовалось добавить элемент во входной поток, чтобы отметить конец.

Выход:

(,0)
(A,2)
(B,3)
(C,2)

Спасибо @chunjef за предложение в комментариях

person Andrejs    schedule 10.09.2017