У меня есть класс, который берет локальный файл, преобразует его и сохраняет в GCS:
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }
import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._
class GcsService(gcsStorage: Storage) {
def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build
if (destination.unzipGzip) {
for (input ← managed(new ZipInputStream(Files.newInputStream(localPath)));
output ← managed(new GZIPOutputStream(Channels.newOutputStream(gcsStorage.writer(blobInfo))))) {
ByteStreams.copy(input, output)
}
} else if (destination.decompressBzip2) {
for (input <- managed(new BZip2CompressorInputStream(Files.newInputStream(localPath)));
output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
ByteStreams.copy(input, output)
}
} else {
for (input <- managed(Files.newInputStream(localPath));
output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
IOUtils.copy(input, output)
}
}
}
}
case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)
Я пытаюсь удалить некоторое дублирование кода, в частности создание fileInputStream
и gcsOutputStream
. Но я не могу просто извлечь эти переменные в начале метода, потому что это создаст ресурсы за пределами блока scala-arm managed
:
import java.io.{ InputStream, OutputStream }
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }
import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._
class GcsService(gcsStorage: Storage) {
def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build
// FIXME: creates a resource outside of the ARM block
val fileInputStream = Files.newInputStream(localPath)
val gcsOutputStream = Channels.newOutputStream(gcsStorage.writer(blobInfo))
if (destination.unzipGzip) {
unzipGzip(fileInputStream, gcsOutputStream)
} else if (destination.decompressBzip2) {
decompressBzip2(fileInputStream, gcsOutputStream)
} else {
copy(fileInputStream, gcsOutputStream)
}
}
private def unzipGzip(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input ← managed(new ZipInputStream(inputStream));
output ← managed(new GZIPOutputStream(outputStream))) {
ByteStreams.copy(input, output)
}
}
private def decompressBzip2(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input <- managed(new BZip2CompressorInputStream(inputStream));
output <- managed(outputStream)) {
ByteStreams.copy(input, output)
}
}
private def copy(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input <- managed(inputStream);
output <- managed(outputStream)) {
IOUtils.copy(input, output)
}
}
}
case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)
Как видите, код намного понятнее и удобнее для тестирования, но ресурсы обрабатываются некорректно, поскольку они не «управляемы». Например, если при создании gcsOutputStream
возникнет исключение, fileInputStream
не будет закрыто.
Вероятно, я мог бы решить эту проблему, используя источники и приемники Google Guava, но мне интересно, есть ли это лучший способ сделать это на Scala, не вводя Guava. В идеале использовать стандартную библиотеку или функцию scala-arm, или, может быть, даже в Cats
?
- Должен ли я определять
fileInputStream
иgcsOutputStream
как функции, которые ничего не принимают и возвращают поток? Кажется, код будет более подробным с() => InputStream
и() => OutputStream
везде? - Должен ли я использовать несколько «управляемых» scala-arm для понимания (один для определения
fileInputStream
иgcsOutputStream
, а другой внутри каждой подфункции)? Если я это сделаю, не будет ли проблемой дважды закрыть «внутренний» поток ввода? - Есть ли чистый и "скалашный" подход к этому, которого я не вижу?
scala-arm
наUsing
. Есть ли чистый способ решить, что я пытаюсь сделать сUsing
, который недоступен сscala-arm
? Если это так, это может быть интересным ответом на этот вопрос, и я думаю, что мог бы использовать источники/приемники Guava, пока мы не перенесемся. - person Etienne Neveu   schedule 23.01.2020Resource
или zioManaged
. - person Luis Miguel Mejía Suárez   schedule 23.01.2020effect system
может быть лучшим способом решить эту проблему. Я предполагаю, что это означало бы переход от нашего текущего стиля Java к более функциональному стилю программирования. Я собираюсь изучить эффекты котов/зио. Мне было бы очень интересно увидеть ответ, показывающий, как код будет выглядеть при таком подходе. @cchantep: большое спасибо за предложение Бенджи, я не знал об этом, и это выглядит очень интересно. К сожалению, я не могу добавить слишком много зависимостей, таких как Play/Akka, мы на самом деле пытаемся уменьшить количество зависимостей в этом проекте. - person Etienne Neveu   schedule 23.01.2020