Мне нужно создать функцию со следующим интерфейсом:
import akka.kafka.scaladsl.Consumer.Control
object ItemConversionFlow {
def build(config: StreamConfig): Flow[Item, OtherItem, Control] = {
// Implementation goes here
}
Моя проблема в том, что я не знаю, как определить поток таким образом, чтобы он соответствовал описанному выше интерфейсу.
Когда я делаю что-то подобное
val flow = Flow[Item]
.map(item => doConversion(item)
.filter(_.isDefined)
.map(_.get)
результирующий тип — Flow[Item, OtherItem, NotUsed]. Я пока ничего не нашел в документации Akka. Кроме того, функции на akka.stream.scaladsl.Flow предлагают только «Не используется» вместо «Управление». Было бы здорово, если бы кто-то мог указать мне правильное направление.
Некоторая предыстория: мне нужно настроить несколько конвейеров, которые различаются только в части преобразования. Эти пайплайны являются подпотоками к основному потоку, который по какой-то причине может быть остановлен (соответствующее сообщение поступает в какую-то тему кафки). Поэтому мне нужна часть управления. Идея состоит в том, чтобы создать шаблон Graph, в который я просто вставлю упомянутый поток в качестве аргумента (фабрика, возвращающая его). Для конкретного случая у нас есть решение, которое работает. Чтобы обобщить, мне нужен такой поток.
mapMaterializedvalue
) или в пользовательской GraphStage, или при использовании GraphDSL - person Viktor Klang   schedule 17.01.2018