Я пытаюсь использовать кучу файлов с S3 в потоковом режиме, используя потоки akka:
S3.listBucket("<bucket>", Some("<common_prefix>"))
.flatMapConcat { r => S3.download("<bucket>", r.key) }
.mapConcat(_.toList)
.flatMapConcat(_._1)
.via(Compression.gunzip())
.via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
.map(_.utf8String)
.runForeach { x => println(x) }
Без увеличения akka.http.host-connection-pool.response-entity-subscription-timeout
получаю
java.util.concurrent.TimeoutException: Response entity was not subscribed after 1 second. Make sure to read the response entity body or call discardBytes() on it.
для второго файла сразу после печати последней строки первого файла при попытке доступа к первой строке второго файла.
Я понимаю природу этого исключения. Я не понимаю, почему запрос ко второму файлу уже выполняется, а первый файл еще обрабатывается. Я предполагаю, что здесь задействована некоторая буферизация.
Есть идеи, как избавиться от этого исключения без необходимости увеличивать akka.http.host-connection-pool.response-entity-subscription-timeout
?
runReduce
в этих источниках эффективно буферизует данные в памяти, а это не то, что я хочу, и которые не передаются в потоковом режиме. @cchantep: Думаю, Benji S3 также буферизует весь файл в памяти? - person shagoon   schedule 05.06.2020.log("before download")
и.log("after download")
около первогоflatMapConcat
. Запрос второго файла отправляется сразу после ответа первого. Я считаю, что это неправильно. - person shagoon   schedule 05.06.2020