Реализация пользовательского источника потоков Akka на основе ActorPublisher

Я хотел бы реализовать пользовательский Source[ByteSting] в Akka Stream. Этот источник должен просто читать данные из предоставленного файла и в пределах предоставленного диапазона байтов и распространять их вниз по потоку.

Сначала я подумал, что это можно сделать, реализовав Actor, который смешивается с ActorPublisher. Эта реализация аналогична akka.stream.impl.io.FilePublisher, которая считывает весь файл по указанному пути, а не только данные из заданного диапазона байтов:

import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Path, StandardOpenOption}

import akka.actor.{ActorLogging, DeadLetterSuppression, Props}
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.util.ByteString

import scala.annotation.tailrec
import scala.util.control.NonFatal

class FilePublisher(pathToFile: Path, startByte: Long, endByte: Long) extends ActorPublisher[ByteString]
  with ActorLogging{

  import FilePublisher._

  private val chunksToBuffer = 10
  private var bytesLeftToRead = endByte - startByte + 1
  private var fileChannel: FileChannel = _
  private val buffer = ByteBuffer.allocate(8096)

  private var bufferedChunks: Vector[ByteString] = _

  override def preStart(): Unit = {
    try {
      log.info("Starting")
      fileChannel = FileChannel.open(pathToFile, StandardOpenOption.READ)
      bufferedChunks = readAhead(Vector.empty, Some(startByte))
      log.info("Chunks {}", bufferedChunks)
    } catch {
      case NonFatal(ex) => onErrorThenStop(ex)
    }
  }

  override def postStop(): Unit = {

    log.info("Stopping")
    if (fileChannel ne null)
      try fileChannel.close() catch {
        case NonFatal(ex) => log.error(ex, "Error during file channel close")
    }
  }

  override def receive: Receive = {
    case Request =>
      readAndSignalNext()
      log.info("Got request")
    case Continue =>
      log.info("Continuing reading")
      readAndSignalNext()
    case Cancel =>
      log.info("Cancel message got")
      context.stop(self)
  }

  private def readAndSignalNext() = {

    log.info("Reading and signaling")
    if (isActive) {
      bufferedChunks = readAhead(signalOnNext(bufferedChunks), None)
      if (isActive && totalDemand > 0) self ! Continue
    }
  }

  @tailrec
  private def signalOnNext(chunks: Vector[ByteString]): Vector[ByteString] = {

    if (chunks.nonEmpty && totalDemand > 0) {
      log.info("Signaling")
      onNext(chunks.head)
      signalOnNext(chunks.tail)
    } else {
      if (chunks.isEmpty && bytesLeftToRead > 0) {
        onCompleteThenStop()
      }
      chunks
    }
  }

  @tailrec
  private def readAhead(currentlyBufferedChunks: Vector[ByteString], startPosition: Option[Long]): Vector[ByteString] = {

    if (currentlyBufferedChunks.size < chunksToBuffer) {

      val bytesRead = readDataFromChannel(startPosition)
      log.info("Bytes read {}", bytesRead)
      bytesRead match {
        case Int.MinValue => Vector.empty
        case -1 =>
          log.info("EOF reached")
          currentlyBufferedChunks // EOF reached
        case _ =>
          buffer.flip()
          val chunk = ByteString(buffer)
          buffer.clear()

          bytesLeftToRead -= bytesRead
          val trimmedChunk = if (bytesLeftToRead >= 0) chunk else chunk.dropRight(bytesLeftToRead.toInt)
          readAhead(currentlyBufferedChunks :+ trimmedChunk, None)
      }

    } else {
      currentlyBufferedChunks
    }
  }

  private def readDataFromChannel(startPosition: Option[Long]): Int = {
    try {
      startPosition match {
        case Some(position) => fileChannel.read(buffer, position)
        case None => fileChannel.read(buffer)
      }
    } catch {
      case NonFatal(ex) =>
        log.error(ex, "Got error reading data from file channel")
        Int.MinValue
    }
  }
}

object FilePublisher {

  private case object Continue extends DeadLetterSuppression

  def props(path: Path, startByte: Long, endByte: Long): Props = Props(classOf[FilePublisher], path, startByte, endByte)
}

Но оказывается, что когда я материализую Source при поддержке моего FilePublisher вот так:

val fileSource = Source.actorPublisher(FilePublisher.props(pathToFile, 0, fileLength))
val future = fileSource.runWith(Sink.seq) 

ничего не происходит, и источник не передает данные дальше по потоку.

Есть ли другой правильный способ материализовать Source на основе моего FilePublisher, или мне не следует использовать этот API и просто реализовать настраиваемый этап обработки, как описано здесь?

Проблема с подходом CustomStage заключается в том, что его тривиальная реализация будет выполнять ввод-вывод сразу на этом этапе. Думаю, я мог бы переместить ввод-вывод со сцены в пользовательский пул потоков или актера, но для этого потребуется некоторая форма синхронизации между сценой и актером. Спасибо.


person thereisnospoon    schedule 10.03.2017    source источник


Ответы (2)


Я заметил, что в настоящее время вы не используете отдельный диспетчер для операций ввода-вывода. Вот раздел документации, объясняющий, почему невыполнение этого может привести к неприятной блокировке в вашем приложении.

Akka Streams оборачивает FilePublisher в FileSource с помощью специального диспетчера на основе пула потоков. Вы можете вдохновиться их кодом здесь.

person Stefano Bonetti    schedule 10.03.2017
comment
Спасибо, это верный момент. Я планировал в будущем добавить использование отдельного диспетчера. - person thereisnospoon; 10.03.2017

Проблема была вызвана ошибкой при сопоставлении с образцом метода receive: эта строка case Request => должна быть вместо case Request(_), потому что Request на самом деле является классом case с одним параметром (final case class Request(n: Long)), а не объектом case, как я думал.

person thereisnospoon    schedule 13.03.2017