Alpakka — чтение Крио-сериализованных объектов из S3

У меня есть Kryo-сериализованные двоичные данные, хранящиеся на S3 (тысячи сериализованных объектов).

Alpakka позволяет читать содержимое как data: Source[ByteString, NotUsed]. Но формат Kryo не использует разделители, поэтому я не могу разделить каждый сериализованный объект на отдельные ByteString с помощью data.via(Framing.delimiter(...)).

Таким образом, Kryo на самом деле нужно читать данные, чтобы понять, когда объект заканчивается, и это не выглядит удобным для потоковой передачи.

Можно ли реализовать этот случай в потоковом режиме, чтобы я получал Source[MyObject, NotUsed] в конце дня?


person Tvaroh    schedule 22.11.2017    source источник


Ответы (1)


Вот этап графа, который делает это. Он обрабатывает случай, когда сериализованный объект охватывает две строки байтов. Его нужно улучшить, когда объекты большие (не мой вариант использования) и могут занимать более двух байтовых строк в Source[ByteString, NotUsed].

object KryoReadStage {
  def flow[T](kryoSupport: KryoSupport,
              `class`: Class[T],
              serializer: Serializer[_]): Flow[ByteString, immutable.Seq[T], NotUsed] =
    Flow.fromGraph(new KryoReadStage[T](kryoSupport, `class`, serializer))
}

final class KryoReadStage[T](kryoSupport: KryoSupport,
                             `class`: Class[T],
                             serializer: Serializer[_])
  extends GraphStage[FlowShape[ByteString, immutable.Seq[T]]] {

  override def shape: FlowShape[ByteString, immutable.Seq[T]] = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {

      setHandler(in, new InHandler {

        override def onPush(): Unit = {
          val bytes =
            if (previousBytes.length == 0) grab(in)
            else ByteString.fromArrayUnsafe(previousBytes) ++ grab(in)

          Managed(new Input(new ByteBufferBackedInputStream(bytes.asByteBuffer))) { input =>
            var position = 0
            val acc = ListBuffer[T]()

            kryoSupport.withKryo { kryo =>
              var last = false

              while (!last && !input.eof()) {
                tryRead(kryo, input) match {
                  case Some(t) =>
                    acc += t
                    position = input.total().toInt
                    previousBytes = EmptyArray
                  case None =>
                    val bytesLeft = new Array[Byte](bytes.length - position)

                    val bb = bytes.asByteBuffer
                    bb.position(position)
                    bb.get(bytesLeft)

                    last = true
                    previousBytes = bytesLeft
                }
              }

              push(out, acc.toList)
            }
          }
        }

        private def tryRead(kryo: Kryo, input: Input): Option[T] =
          try {
            Some(kryo.readObject(input, `class`, serializer))
          } catch {
            case _: KryoException => None
          }

      })

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })

      private val EmptyArray: Array[Byte] = Array.empty

      private var previousBytes: Array[Byte] = EmptyArray

    }
  }

  override def toString: String = "KryoReadStage"

  private lazy val in: Inlet[ByteString] = Inlet("KryoReadStage.in")
  private lazy val out: Outlet[immutable.Seq[T]] = Outlet("KryoReadStage.out")

}

Пример использования:

client.download(BucketName, key)
  .via(KryoReadStage.flow(kryoSupport, `class`, serializer))
  .flatMapConcat(Source(_))

Он использует несколько дополнительных помощников ниже.

ByteBufferBackedInputStream:

class ByteBufferBackedInputStream(buf: ByteBuffer) extends InputStream {

  override def read: Int = {
    if (!buf.hasRemaining) -1
    else buf.get & 0xFF
  }

  override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
    if (!buf.hasRemaining) -1
    else {
      val read = Math.min(len, buf.remaining)
      buf.get(bytes, off, read)
      read
    }
  }

}

Управляется:

object Managed {

  type AutoCloseableView[T] = T => AutoCloseable

  def apply[T: AutoCloseableView, V](resource: T)(op: T => V): V =
    try {
      op(resource)
    } finally {
      resource.close()
    }
}

КриоПоддержка:

trait KryoSupport {
  def withKryo[T](f: Kryo => T): T
}

class PooledKryoSupport(serializers: (Class[_], Serializer[_])*) extends KryoSupport {

  override def withKryo[T](f: Kryo => T): T = {
    pool.run(new KryoCallback[T] {
      override def execute(kryo: Kryo): T = f(kryo)
    })
  }

  private val pool = {
    val factory = new KryoFactory() {
      override def create(): Kryo = {
        val kryo = new Kryo

        (KryoSupport.ScalaSerializers ++ serializers).foreach {
          case ((clazz, serializer)) =>
            kryo.register(clazz, serializer)
        }

        kryo
      }
    }

    new KryoPool.Builder(factory).softReferences().build()
  }

}
person Tvaroh    schedule 22.11.2017