Alpakka: как десериализовать xml в объекты. Есть ли более краткий шаблон для десериализаторов?

У меня есть сложные данные XML (они могут содержать много данных и могут иметь более 15 ГБ), которые имеют сложную природу с глубокой структурой. Нам нужна потоковая обработка для нашего огромного XML. Использование новой библиотеки Alpakka — наш первый выбор, поскольку это многообещающее решение.

Есть устаревшие потоки в сериализации scala-xml и другие библиотеки Scala, но нам нужно обрабатывать огромные объемы XML в виде потоков событий.

Чтобы упростить ситуацию, давайте представим, что у нас есть Заказ на покупку (XML взят из этого страница).

<?xml version="1.0"?>  
<PurchaseOrder PurchaseOrderNumber="99503" OrderDate="1999-10-20">  
  <Address Type="Shipping">  
    <Name>Ellen Adams</Name>  
    <Street>123 Maple Street</Street>  
    <City>Mill Valley</City>  
    <State>CA</State>  
    <Zip>10999</Zip>  
    <Country>USA</Country>  
  </Address>  
  <Address Type="Billing">  
    <Name>Tai Yee</Name>  
    <Street>8 Oak Avenue</Street>  
    <City>Old Town</City>  
    <State>PA</State>  
    <Zip>95819</Zip>  
    <Country>USA</Country>  
  </Address>  
  <DeliveryNotes>Please leave packages in shed by driveway.</DeliveryNotes>  
  <Items>  
    <Item PartNumber="872-AA">  
      <ProductName>Lawnmower</ProductName>  
      <Quantity>1</Quantity>  
      <USPrice>148.95</USPrice>  
      <Comment>Confirm this is electric</Comment>  
    </Item>  
    <Item PartNumber="926-AA">  
      <ProductName>Baby Monitor</ProductName>  
      <Quantity>2</Quantity>  
      <USPrice>39.98</USPrice>  
      <ShipDate>1999-05-21</ShipDate>  
    </Item>  
  </Items>  
</PurchaseOrder>  

Я пытаюсь передать все Item из XML и десериализовать их. Имейте в виду, что одни и те же теги могут отображаться на разных уровнях. Более того, элементы/атрибуты внутри Item могут появляться в произвольном порядке. Подход, который я вижу (в основном на основе Alpakka XmlProcessingTest — кто-нибудь может предложить лучшие ссылки?), может выглядеть следующим образом:

import akka.actor.ActorSystem
import akka.stream.alpakka.xml.scaladsl.XmlParsing
import akka.stream.alpakka.xml.{Characters, EndElement, ParseEvent, StartElement}
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, IOResult}
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import akka.util.ByteString

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.xml.{EndElement, ParseEvent, StartElement}

import scala.collection.mutable

trait Builder[T] {
  def build(): T
}

case class Item(partNumber: String)

object Item {
  def apply(builder: ItemBuilder,
            path: mutable.Stack[String]): PartialFunction[ParseEvent, Unit] = {
    case elem @ StartElement("Item", _, _, _, _) =>
      val partNumber = elem.findAttribute("PartNumber").map(_.value).getOrElse("")
      path.push(s"Item")
      builder.partNumber = partNumber
    case EndElement("Item") =>
      path.pop()
  }
}

class ItemBuilder() extends Builder[Item] {
  var partNumber = ""

  override def build(): Item =
    Item(
      partNumber = partNumber
    )

  def reset(): Unit = {
    partNumber = ""
  }
}


implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()

val path: mutable.Stack[String] = new mutable.Stack[String]()

val xml =
  """<PurchaseOrder PurchaseOrderNumber="99503" OrderDate="1999-10-20">
    |<Address Type="Shipping">
    |  <Name>Ellen Adams</Name>
    |  <Street>123 Maple Street</Street>
    |  <City>Mill Valley</City>
    |  <State>CA</State>
    |  <Zip>10999</Zip>
    |  <Country>USA</Country>
    |</Address>
    |<Address Type="Billing">
    |  <Name>Tai Yee</Name>
    |  <Street>8 Oak Avenue</Street>
    |  <City>Old Town</City>
    |  <State>PA</State>
    |  <Zip>95819</Zip>
    |  <Country>USA</Country>
    |</Address>
    |<DeliveryNotes>Please leave packages in shed by driveway.</DeliveryNotes>
    |<Items>
    |  <Item PartNumber="872-AA">
    |    <ProductName>Lawnmower</ProductName>
    |    <Quantity>1</Quantity>
    |    <USPrice>148.95</USPrice>
    |    <Comment>Confirm this is electric</Comment>
    |  </Item>
    |  <Item PartNumber="926-AA">
    |    <ProductName>Baby Monitor</ProductName>
    |    <Quantity>2</Quantity>
    |    <USPrice>39.98</USPrice>
    |    <ShipDate>1999-05-21</ShipDate>
    |  </Item>
    |</Items>
    |</PurchaseOrder>""".stripMargin

val documentStream = Source.single(xml)

val builder = new ItemBuilder()

val default: PartialFunction[ParseEvent, Unit] = {
  case Characters(any) =>
  case StartElement(localName, _, _, _, _) =>
    path.push(localName)
  case EndElement(localName) =>
    path.pop()
  case any =>
}

val handle: PartialFunction[ParseEvent, Unit] = Item(builder, path) orElse
  default

val source: Source[Item, akka.NotUsed] = documentStream
  .map(ByteString(_))
  .via(XmlParsing.parser)
  .splitWhen(_ match {
    case StartElement("Item", _, _, _, _) =>
      true
    case _ =>
      false
  })
  .fold[ItemBuilder](new ItemBuilder()) {
  case (_, parseEvent) =>
    handle(parseEvent)
    builder
}
  .map { builder: ItemBuilder =>
    val item = builder.build()
    builder.reset()
    item
  }
  .concatSubstreams
  .filterNot(_.partNumber.isEmpty)

val resultFuture: Future[Seq[Item]] = source
  .runWith(Sink.seq)

val result: Seq[Item] = Await.result(resultFuture, 5.seconds)

println("items : " + result)
println("END")

Пример размещен на Scastie.

Для этого подхода требуется множество обработчиков для каждого тега (val handle: PartialFunction), которые могут быть подвержены ошибкам и слишком хрупки.

Мне интересно, как более лаконично обрабатывать ParseEvent и объединять их в необходимые объекты Item. Любое предложение, как избежать стандартного кода? Есть ли более лаконичный шаблон для десериализаторов?


person sergiusz.kierat    schedule 16.02.2018    source источник
comment
может помочь потоковый выборочный анализатор: github.com/Tradeshift/ts-reaktive/blob/master/ Я должен это проверить.   -  person sergiusz.kierat    schedule 20.02.2018


Ответы (1)


Я немного подчистил ваш код и расширил конструктор, чтобы проиллюстрировать, как он может на самом деле создавать объекты после серии событий подэлементов. Может быть, другие смогут улучшить мою версию.

Я предлагаю разделить поток после разбора элемента. Таким образом, вы можете продолжать использовать построитель синглтона и простой стек путей.

Вы можете добавить аналогичные обработчики и компоновщики для любых гипотетических сложных поддеревьев Item; единственное отличие состоит в том, что результат SubElement.build() присваивается некоторому атрибуту ItemBuilder, а не возвращается.

Если вам нужно обратиться к подэлементам с тем же именем, вы можете либо заглянуть глубже в стек пути, либо добавить дополнительное отслеживание состояния.

import akka.stream.alpakka.xml.scaladsl.XmlParsing
import akka.stream.scaladsl._

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import akka.util.ByteString
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.xml.{Characters, EndElement, ParseEvent, StartElement}

import scala.collection.mutable

case class Item(partNumber: String, productName: String)

object Item {
  def apply(path: mutable.Stack[String]): PartialFunction[ParseEvent, Any] = {
    case elem @ StartElement("Item", _, _, _, _) =>
      path.push("Item")
      ItemBuilder.reset()
      ItemBuilder.partNumber = elem.findAttribute("PartNumber").map(_.value).getOrElse("")
    case Characters(text) =>
      path.top match {
        case "ProductName" => ItemBuilder.productName = text
        case _ => ()
      }
    case EndElement("Item") =>
      path.pop()
      ItemBuilder.build()
  }
}

object ItemBuilder {
  var partNumber = ""
  var productName = ""

  def build(): Item =
    Item(
      partNumber = partNumber,
      productName = productName)

  def reset(): Unit = {
    partNumber = ""
    productName = ""
  }
}

object AlpakkaDemo extends App {

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()

  val path: mutable.Stack[String] = new mutable.Stack[String]()

  val xml =
    """<PurchaseOrder PurchaseOrderNumber="99503" OrderDate="1999-10-20">
      |<Address Type="Shipping">
      |  <Name>Ellen Adams</Name>
      |  <Street>123 Maple Street</Street>
      |  <City>Mill Valley</City>
      |  <State>CA</State>
      |  <Zip>10999</Zip>
      |  <Country>USA</Country>
      |</Address>
      |<DeliveryNotes>Please leave packages in shed by driveway.</DeliveryNotes>
      |<Items>
      |  <Item PartNumber="872-AA">
      |    <ProductName>Lawnmower</ProductName>
      |    <Quantity>1</Quantity>
      |    <USPrice>148.95</USPrice>
      |    <Comment>Confirm this is electric</Comment>
      |  </Item>
      |  <Item PartNumber="926-AA">
      |    <ProductName>Baby Monitor</ProductName>
      |    <Quantity>2</Quantity>
      |    <USPrice>39.98</USPrice>
      |    <ShipDate>1999-05-21</ShipDate>
      |  </Item>
      |</Items>
      |</PurchaseOrder>""".stripMargin

  val defaultElementHandler: PartialFunction[ParseEvent, Any] = {
    case StartElement(localName, _, _, _, _) => path.push(localName)
    case EndElement(localName) => path.pop()
    case _ => ()
  }

  val handlersChain: PartialFunction[ParseEvent, Any] =
    Item(path) orElse
    defaultElementHandler

  val source: Source[Item, akka.NotUsed] = Source.single(xml)
    .map(ByteString(_))
    .via(XmlParsing.parser)
    .map(handlersChain)
    .collect {
      case item: Item => item
    }.splitWhen(_ => true)  // also consider mapAsyncUnordered()
    .map {
      identity  // placeholder for any subsequent heavy Item processing
    }
    .concatSubstreams

  val resultFuture: Future[Seq[Item]] = source.runWith(Sink.seq)

  val result: Seq[Item] = Await.result(resultFuture, 5.seconds)

  println("items : " + result)
  println("END")
  system.terminate()
}

Вот также несколько (очень отдаленно) связанных статей, которые могут вдохновить вас на новые идеи по организации парсинга на основе событий.

http://www.ficksworkshop.com/blog/post/design-pattern-for-event-based-parsing-of-hierarchical-data

https://www.xml.com/pub/a/2003/09/17/stax.html

https://www.infoq.com/articles/HIgh-Performance-Parsers-in-Java-V2

person Valentyn Danylchuk    schedule 22.02.2018