Akka Streams, исходные элементы в качестве другого источника?

Я использую Alpakka-FTP, но, возможно, я ищу общий шаблон akka-stream. Соединитель FTP может отображать файлы или извлекать их:

def ls(host: String): Source[FtpFile, NotUsed]
def fromPath(host: String, path: Path): Source[ByteString, Future[IOResult]]

В идеале я хотел бы создать такой поток:

LIST
  .FETCH_ITEM
  .FOREACH(do something)

Но я не могу создать такой поток с помощью двух функций, которые я написал выше. Я чувствую, что должен быть в состоянии добраться туда, используя Flow, что-то вроде

Ftp.ls
  .via(some flow that uses the Ftp.fromPath above)
  .runWith(Sink.foreach(do something))

Возможно ли это, учитывая только функции ls и fromPath выше?

ИЗМЕНИТЬ:

Я могу решить это, используя одного актера и mapAsync, но я все еще чувствую, что это должно быть более прямолинейно.

class Downloader extends Actor {
  override def receive = {
    case ftpFile: FtpFile =>
      Ftp.fromPath(Paths.get(ftpFile.path), settings)
        .toMat(FileIO.toPath(Paths.get("testHDF.txt")))(Keep.right)
        .run() pipeTo sender
  }
}

val downloader = as.actorOf(Props(new Downloader))

Ftp.ls("test_path", settings)
  .mapAsync(1)(ftpFile => (downloader ? ftpFile) (3.seconds).mapTo[IOResult])
  .runWith(Sink.foreach(res => println("got it!" + res)))

person ticofab    schedule 22.01.2017    source источник


Ответы (1)


Вы должны иметь возможность использовать flatMapConcat для этой цели. Ваш конкретный пример может быть переписан на

Ftp.ls("test_path", settings).flatMapConcat{ ftpFile =>
  Ftp.fromPath(Paths.get(ftpFile.path), settings)
}.runWith(FileIO.toPath(Paths.get("testHDF.txt")))

Документация здесь.

person Stefano Bonetti    schedule 22.01.2017