akka stream alpakka csv: пропустить исключение и проанализировать следующие строки

Я использую Alpakka для разбора файлов csv. версия "com.lightbend.akka" %% "akka-stream-alpakka-csv" % 0.20 У меня есть файл csv с незакрытой цитатой.

email
[email protected]
"[email protected]
[email protected]
[email protected]

Я хочу пропустить плохие ряды и перейти к следующему, но мой поток падает.

Я использую supervisorStrategy Supervision.Resume, но это не работает.

Поток терпит неудачу, когда находит незакрытую цитату.

Есть ли способ исправить это?

мой код:

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()

def hdfsSource(csv: String): Source[ByteString, Future[IOResult]] =
  Source
    .single(csv)
    .map(ByteString.apply)
    .mapMaterializedValue(_ => Future.successful(IOResult(1, Success(Done))))

val csv = """email,country,name
            |"test,test,test
            |test,test,test
            |test,test,test
            |""".stripMargin

val source = hdfsSource(csv)

val decider: Supervision.Decider = {
  case _ ⇒ Supervision.Resume
}

val result = source
  .via(CsvParsing.lineScanner())
  .via(CsvToMap.toMapAsStrings())
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .runForeach(println)

person Slavik Muz    schedule 29.11.2018    source источник


Ответы (1)


В настоящее время CsvParsing.lineScanner() не поддерживает стратегии контроля. Вы можете выбрать другой символ в качестве символа кавычек для сканера строк CsvParsing.lineScanner(quoteChar = '\''). Затем вы получите незакрытую двойную кавычку как часть проанализированных результатов:

Map(email -> "test, country -> test, name -> test) Map(email -> test, country -> test, name -> test) Map(email -> test, country -> test, name -> test)

person dvim    schedule 30.11.2018