TimeoutException при использовании файлов из S3 с потоками akka

Я пытаюсь использовать кучу файлов с S3 в потоковом режиме, используя потоки akka:

S3.listBucket("<bucket>", Some("<common_prefix>"))
  .flatMapConcat { r => S3.download("<bucket>", r.key) }
  .mapConcat(_.toList)
  .flatMapConcat(_._1)
  .via(Compression.gunzip())
  .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
  .map(_.utf8String)
  .runForeach { x => println(x) }

Без увеличения akka.http.host-connection-pool.response-entity-subscription-timeout получаю

java.util.concurrent.TimeoutException: Response entity was not subscribed after 1 second. Make sure to read the response entity body or call discardBytes() on it. для второго файла сразу после печати последней строки первого файла при попытке доступа к первой строке второго файла.

Я понимаю природу этого исключения. Я не понимаю, почему запрос ко второму файлу уже выполняется, а первый файл еще обрабатывается. Я предполагаю, что здесь задействована некоторая буферизация.

Есть идеи, как избавиться от этого исключения без необходимости увеличивать akka.http.host-connection-pool.response-entity-subscription-timeout?


person shagoon    schedule 05.06.2020    source источник
comment
Вы можете проверить, есть ли у вас такая же проблема с Benji S3 (Scala DSL для S3 / GCP /. .. Я соавтор)   -  person cchantep    schedule 05.06.2020
comment
Попробуй это? https://doc.akka.io/docs/akka-http/current/implications-of-streaming-http-entity.html#integrating-with-akka-streams   -  person Yik San Chan    schedule 05.06.2020
comment
Спасибо за ваши комментарии / предложения. @YikSanChan: Я думаю (надеюсь), что документация вводит в заблуждение. Для больших файлов вы просто не можете использовать поток за секунду. Я думаю, что на самом деле имеется в виду, что у вас не должно быть пауз длиннее настроенного тайм-аута между извлечением элементов из этого потока. Использование runReduce в этих источниках эффективно буферизует данные в памяти, а это не то, что я хочу, и которые не передаются в потоковом режиме. @cchantep: Думаю, Benji S3 также буферизует весь файл в памяти?   -  person shagoon    schedule 05.06.2020
comment
Я добавил .log("before download") и .log("after download") около первого flatMapConcat. Запрос второго файла отправляется сразу после ответа первого. Я считаю, что это неправильно.   -  person shagoon    schedule 05.06.2020


Ответы (1)


Вместо объединения обработки загруженных файлов в один поток с помощью flatMapConcat вы можете попытаться материализовать поток во внешнем потоке и полностью обработать его там, прежде чем отправлять выходные данные ниже по потоку. Тогда вам не следует начинать загрузку (и полную обработку) следующего объекта, пока вы не будете готовы.

Как правило, вы хотите избежать слишком большого количества потоковых материализаций, чтобы уменьшить накладные расходы, но я подозреваю, что это будет незначительным для приложения, выполняющего сетевой ввод-вывод, подобный этому.

Сообщите мне, работает ли что-то вроде этого: (предупреждение: непроверено)

S3.listBucket("<bucket>", Some("<common_prefix>"))
  .mapAsync(1) { result =>
    val contents = S3.download("<bucket>", r.key)
      .via(Compression.gunzip())
      .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
      .map(_.utf8String)
      .to(Sink.seq)(Keep.right)
      .run()
    contents     
  }
  .mapConcat(identity)
  .runForeach { x => println(x) }
person Sean Glover    schedule 08.06.2020