Как читать и обрабатывать файл фрагмент за фрагментом для каждого шага процесса с помощью Play Iteratees

Я использую Play Framework Iteratee для чтения файла. Я хотел бы обработать этот файл по частям (для каждого шага).

Я составляю следующие шаги:

  • groupByLines: Enumeratee[Array[Byte], List[String]]
  • turnIntoLines: Enumeratee[List[String], List[Line]] (я определил case class Line(number: Int, value: String))
  • parseChunk: Iteratee[List[Line], Try[List[T]]] (например, парсинг CSV)

Чтобы определить groupByLines, мне нужно использовать Iteratee.fold для объединения последней строки предыдущего фрагмента с первым фрагментом текущего фрагмента.

Проблема в том, что это создает один блок, содержащий всю строку файла.

Но я хотел бы обработать файл по частям. Я имею в виду, что groupByLines должен создавать фрагменты по 200 строк (например).

Та же проблема возникает с turnIntoLine. Я также использую fold для создания линии. Мне нужно использовать аккумулятор (предоставленный fold), чтобы заархивировать номер строки и содержимое строки.

Я новичок в игре iteratee.

Вот мой код:

val chunkSize = 1024 * 8

val enumerator: Enumerator[Array[Byte]] = Enumerator.fromFile(file, chunkSize)

def isLastChunk(chunk: Array[Byte]): Boolean = {
  chunk.length < chunkSize
}

val groupByLines: Enumeratee[Array[Byte], List[String]] = Enumeratee.grouped {
  println("groupByLines")
  Iteratee.fold[Array[Byte], (String, List[String])]("", List.empty) {
    case ((accLast, accLines), chunk) =>
      println("groupByLines chunk size " + chunk.length)
      new String(chunk)
        .trim
        .split("\n")
        .toList match {
        case lines  @ Cons(h, tail) =>
          val lineBetween2Chunks: String = accLast + h

          val goodLines =
            isLastChunk(chunk) match {
              case true  => Cons(lineBetween2Chunks, tail)
              case false => Cons(lineBetween2Chunks, tail).init
            }

          (lines.last, accLines ++ goodLines)
        case Nil => ("", accLines)
      }
  }.map(_._2)
}


val turnIntoLines: Enumeratee[List[String], List[Line]] = Enumeratee.grouped {
  println("turnIntoLines")
  Iteratee.fold[List[String], (Int, List[Line])](0, List.empty) {
    case ((index, accLines), chunk) =>
      println("turnIntoLines chunk size " + chunk.length)
      val lines =
        ((Stream from index) zip chunk).map {
          case (lineNumber, content) => Line(lineNumber, content)
        }.toList
      (index + chunk.length, lines ++ accLines)
  }.map(_._2)
}

person Dnomyar    schedule 26.09.2016    source источник


Ответы (1)


Проблема здесь в том, как обрабатывать файл построчно с помощью Play Iteratees.

Во-первых, чтобы прочитать файл с помощью UTF-8, я использовал:

object EnumeratorAdditionalOperators {
  implicit def enumeratorAdditionalOperators(e: Enumerator.type): EnumeratorAdditionalOperators = new EnumeratorAdditionalOperators(e)
}

class EnumeratorAdditionalOperators(e: Enumerator.type) {

  def fromUTF8File(file: File, chunkSize: Int = 1024 * 8): Enumerator[String] =
    e.fromFile(file, chunkSize)
      .map(bytes => new String(bytes, Charset.forName("UTF-8")))

}

Затем, чтобы разделить входной фрагмент на строки (вырезать в '\n'):

object EnumerateeAdditionalOperators {
  implicit def enumerateeAdditionalOperators(e: Enumeratee.type): EnumerateeAdditionalOperators = new EnumerateeAdditionalOperators(e)
}

class EnumerateeAdditionalOperators(e: Enumeratee.type) {

  def splitToLines: Enumeratee[String, String] = e.grouped(
    Traversable.splitOnceAt[String,Char](_ != '\n')  &>>
      Iteratee.consume()
  )

}

В-третьих, чтобы добавить номер строки, я использовал фрагмент кода, найденный здесь https://github.com/michaelahlers/michaelahlers-playful/blob/master/src/main/scala./ahlers/michael/playful/iteratee/EnumerateeFactoryOps.scala.

class EnumerateeAdditionalOperators(e: Enumeratee.type) {

  /**
    * As a complement to [[play.api.libs.iteratee.Enumeratee.heading]] and [[play.api.libs.iteratee.Enumeratee.trailing]], allows for inclusion of arbitrary elements between those from the producer.
    */
  def joining[E](separators: Enumerator[E])(implicit ec: ExecutionContext): Enumeratee[E, E] =
    zipWithIndex[E] compose Enumeratee.mapInputFlatten[(E, Int)] {

      case Input.Empty =>
        Enumerator.enumInput[E](Input.Empty)

      case Input.El((element, index)) if 0 < index =>
        separators andThen Enumerator(element)

      case Input.El((element, _)) =>
        Enumerator(element)

      case Input.EOF =>
        Enumerator.enumInput[E](Input.EOF)

    }

  /**
    * Zips elements with an index of the given [[scala.math.Numeric]] type, stepped by the given function.
    *
    * (Special thanks to [[https://github.com/eecolor EECOLOR]] for inspiring this factory with his answer to [[https://stackoverflow.com/a/27589990/700420 a question about enumeratees on Stack Overflow]].)
    */
  def zipWithIndex[E, I](first: I, step: I => I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] =
    e.scanLeft[E](null.asInstanceOf[E] -> ev.minus(first, step(ev.zero))) {
      case ((_, index), value) =>
        value -> step(index)
    }

  /**
    * Zips elements with an incrementing index of the given [[scala.math.Numeric]] type, adding one each time.
    */
  def zipWithIndex[E, I](first: I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] = zipWithIndex(first, ev.plus(_, ev.one))

  /**
    * Zips elements with an incrementing index by the same contract [[scala.collection.GenIterableLike#zipWithIndex zipWithIndex]].
    */
  def zipWithIndex[E]: Enumeratee[E, (E, Int)] = zipWithIndex(0)

  // ...

}

Обратите внимание, что я определил имплициты для «добавления» методов к Enumerator и Enumeratee. Этот трюк позволяет написать, например, так: Enumerator.fromUTF8File(file).

Сложить все вместе:

case class Line(number: Int, value: String)


Enumerator.fromUTF8File(file) &>
Enumeratee.splitToLines ><>
Enumeratee.zipWithIndex ><> Enumeratee.map{
  case (e, idx) => Line(idx, e)
} // then an Iteratee or another Enumeratee

Новый код намного проще и лаконичнее, чем тот, что указан в вопросе.

person Dnomyar    schedule 19.12.2016