Как я могу компоновать ресурсы в Scala, при этом правильно закрывая их с помощью scala-arm?

У меня есть класс, который берет локальный файл, преобразует его и сохраняет в GCS:

import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }

import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._


class GcsService(gcsStorage: Storage) {

  def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
    val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build

    if (destination.unzipGzip) {
      for (input ← managed(new ZipInputStream(Files.newInputStream(localPath)));
           output ← managed(new GZIPOutputStream(Channels.newOutputStream(gcsStorage.writer(blobInfo))))) {
        ByteStreams.copy(input, output)
      }
    } else if (destination.decompressBzip2) {
      for (input <- managed(new BZip2CompressorInputStream(Files.newInputStream(localPath)));
           output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
        ByteStreams.copy(input, output)
      }
    } else {
      for (input <- managed(Files.newInputStream(localPath));
           output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
        IOUtils.copy(input, output)
      }
    }
  }

}

case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)

Я пытаюсь удалить некоторое дублирование кода, в частности создание fileInputStream и gcsOutputStream. Но я не могу просто извлечь эти переменные в начале метода, потому что это создаст ресурсы за пределами блока scala-arm managed:

import java.io.{ InputStream, OutputStream }
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }

import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._


class GcsService(gcsStorage: Storage) {

  def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
    val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build

    // FIXME: creates a resource outside of the ARM block
    val fileInputStream = Files.newInputStream(localPath)
    val gcsOutputStream = Channels.newOutputStream(gcsStorage.writer(blobInfo))

    if (destination.unzipGzip) {
      unzipGzip(fileInputStream, gcsOutputStream)
    } else if (destination.decompressBzip2) {
      decompressBzip2(fileInputStream, gcsOutputStream)
    } else {
      copy(fileInputStream, gcsOutputStream)
    }
  }

  private def unzipGzip(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input ← managed(new ZipInputStream(inputStream));
         output ← managed(new GZIPOutputStream(outputStream))) {
      ByteStreams.copy(input, output)
    }
  }

  private def decompressBzip2(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input <- managed(new BZip2CompressorInputStream(inputStream));
         output <- managed(outputStream)) {
      ByteStreams.copy(input, output)
    }
  }

  private def copy(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input <- managed(inputStream);
         output <- managed(outputStream)) {
      IOUtils.copy(input, output)
    }
  }
}

case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)

Как видите, код намного понятнее и удобнее для тестирования, но ресурсы обрабатываются некорректно, поскольку они не «управляемы». Например, если при создании gcsOutputStream возникнет исключение, fileInputStream не будет закрыто.

Вероятно, я мог бы решить эту проблему, используя источники и приемники Google Guava, но мне интересно, есть ли это лучший способ сделать это на Scala, не вводя Guava. В идеале использовать стандартную библиотеку или функцию scala-arm, или, может быть, даже в Cats?

  • Должен ли я определять fileInputStream и gcsOutputStream как функции, которые ничего не принимают и возвращают поток? Кажется, код будет более подробным с () => InputStream и () => OutputStream везде?
  • Должен ли я использовать несколько «управляемых» scala-arm для понимания (один для определения fileInputStream и gcsOutputStream, а другой внутри каждой подфункции)? Если я это сделаю, не будет ли проблемой дважды закрыть «внутренний» поток ввода?
  • Есть ли чистый и "скалашный" подход к этому, которого я не вижу?

person Etienne Neveu    schedule 22.01.2020    source источник
comment
Вы используете Scala 2.13? Рассматривали ли вы возможность использования scala.util. Используете для некоторых (всех?) ваших ресурсов?   -  person jwvh    schedule 23.01.2020
comment
Очень интересно, я не знал об этой новой функции! К сожалению, мы все еще используем Scala 2.12, и было бы очень сложно перейти на Scala 2.13 из-за технического долга, так что, вероятно, через несколько месяцев... Когда мы это сделаем, я перенесу наш код scala-arm на Using. Есть ли чистый способ решить, что я пытаюсь сделать с Using, который недоступен с scala-arm? Если это так, это может быть интересным ответом на этот вопрос, и я думаю, что мог бы использовать источники/приемники Guava, пока мы не перенесемся.   -  person Etienne Neveu    schedule 23.01.2020
comment
Я бы рекомендовал использовать систему эффектов, обеспечивающую полное управление ресурсами. Как cats-effect Resource или zio Managed.   -  person Luis Miguel Mejía Suárez    schedule 23.01.2020
comment
Или вы можете взглянуть на Scala DSL для GCS, который предлагает потоковую передачу и управление ресурсами, например Benji ( Я соавтор)   -  person cchantep    schedule 23.01.2020
comment
@LuisMiguelMejíaSuárez: действительно, использование effect system может быть лучшим способом решить эту проблему. Я предполагаю, что это означало бы переход от нашего текущего стиля Java к более функциональному стилю программирования. Я собираюсь изучить эффекты котов/зио. Мне было бы очень интересно увидеть ответ, показывающий, как код будет выглядеть при таком подходе. @cchantep: большое спасибо за предложение Бенджи, я не знал об этом, и это выглядит очень интересно. К сожалению, я не могу добавить слишком много зависимостей, таких как Play/Akka, мы на самом деле пытаемся уменьшить количество зависимостей в этом проекте.   -  person Etienne Neveu    schedule 23.01.2020


Ответы (1)


Вы можете реорганизовать его следующим образом:

Сначала объявите управляемые ресурсы:

val fileInputStream: ManagedResource[InputStream] = managed(Files.newInputStream(localPath))
val gcsOutputStream: ManagedResource[OutputStream] = managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))

Он не открывает эти ресурсы, это просто заявление о том, что вы хотите управлять этими ресурсами.

Затем вы можете использовать map, чтобы обернуть их в нужные декораторы (например, ZipInputStream):

if (destination.unzipGzip) {
  for (input ← fileInputStream.map(s => new ZipInputStream(s));
       output ← gcsOutputStream.map(s => new GZIPOutputStream(s))) {
    ByteStreams.copy(input, output)
  }
} else if (destination.decompressBzip2) {
  for (input <- fileInputStream.map(s => new BZip2CompressorInputStream(s));
       output <- gcsOutputStream) {
    ByteStreams.copy(input, output)
  }
} else {
  for (input <- fileInputStream;
       output <- gcsOutputStream) {
    IOUtils.copy(input, output)
  }
}

Конечно, ManagedResource[A] — это просто значение, поэтому вы даже можете передать его методу в качестве параметра:

private def unzipGzip(inputStream: Managed[InputStream], outputStream: Managed[OutputStream]): Unit = {
  for (input ← inputStream.map(s => new ZipInputStream(s));
       output ← outputStream.map(s => new GZIPOutputStream(s))) {
    ByteStreams.copy(input, output)
  }
}
person Krzysztof Atłasik    schedule 23.01.2020
comment
Это интересный подход, но обертка ZipInputStream / GZIPOutputStream / BZip2CompressorInputStream, к сожалению, не будет закрыта, только внутренняя fileInputStream/ gcsOutputStream . Это связано с тем, что метод scala-arm map() не управляет результатом операции сопоставления (это может быть что угодно, но не всегда ресурс-оболочка). Это не было бы проблемой, если бы ресурсы-обертки не имели состояния, но, к сожалению, это не так (например, нам нужно закрыть инфлятор ZipInputStream, чтобы закрыть связанные собственные ресурсы ZLIB). - person Etienne Neveu; 23.01.2020
comment
Таким образом, поведение будет похоже на наличие одного for-comprehension наверху, который управляет fileInputStream/gcsOutputStream, а затем оборачивать эти потоки в подметоды без использования дополнительных managed for-comprehensions. - person Etienne Neveu; 23.01.2020
comment
@EtienneNeveu Может быть, это можно сделать с помощью flatMap, я проверю и обновлю свой ответ. - person Krzysztof Atłasik; 23.01.2020