Сбой потока с ошибкой Завершение работы из-за нарушения спецификации реактивных потоков

Кажется, я никогда не смогу правильно обработать ошибки при использовании Akka Streams.

Итак, это мой код

var db = Database.forConfig("oracle")
var mysqlDb = Database.forConfig("mysql_read")
var mysqlDbWrite = Database.forConfig("mysql_write")

implicit val actorSystem = ActorSystem()
val decider : Supervision.Decider = {
  case _: Exception =>
      println("got an exception restarting connections")
     // let us restart our connections
     db.close()
     mysqlDb.close()
     mysqlDbWrite.close()
     db = Database.forConfig("oracle")
     mysqlDb = Database.forConfig("mysql_read")
     mysqlDbWrite = Database.forConfig("mysql_write")
     Supervision.Restart
}
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))

и у меня такой поток

val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(10){ foo =>
  try {
     val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long]
     mysqlDbWrite.run(existsQuery).map(v => (foo, v))
  } catch {
     case e: Throwable =>
        println(s"Lookup failed for ${foo}")
        throw e // will restart the stream
  }
}.collect {case (f, v) if v.isEmpty => f}

Таким образом, если foo уже существует в MySQL, запись больше не должна обрабатываться потоком.

Я надеялся, что с этим кодом, если что-то не удастся с поиском mysql (машина mysql довольно плохая, и тайм-ауты обычны), запись будет напечатана и отброшена, а поток продолжится с оставшимися записями благодаря надзору.

Когда я запускаю этот код. Я вижу такие ошибки, как

[error] (mysql_write network timeout executor) java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state
java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5576)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: Invalid socket timeout value or state
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:937)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:926)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:872)
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4852)
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Socket is closed
    at java.net.Socket.setSoTimeout(Socket.java:1137)
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850)
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

а также

[error] (mysql_write network timeout executor) java.lang.NullPointerException
java.lang.NullPointerException
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850)
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Что меня здесь удивляет, так это то, что эти исключения не происходят из моего блока catch. потому что я не вижу оператора println в моем блоке catch. Трассировка стека не показывает мне, откуда он произошел ... но поскольку он говорит mysql_write, я могу предположить, что это поток выше, потому что только этот поток использует mysql_write.

Наконец весь поток вылетает с ошибкой

[trace] Stack trace suppressed: run last compile:runMain for the full output.
flow has failed with error Shutting down because of violation of the Reactive Streams specification.
14:51:06,973 |-INFO in ch.qos.logback.classic.AsyncAppender[asyncKafkaAppender] - Worker thread will flush remaining events before exiting.
[success] Total time: 3480 s, completed Sep 26, 2017 2:51:07 PM
14:51:07,603 |-INFO in ch.qos.logback.core.hook.DelayingShutdownHook@2320545b - Sleeping for 1 seconds

Я не знаю, что я сделал, чтобы нарушить спецификацию реактивных потоков !!


person Knows Not Much    schedule 26.09.2017    source источник


Ответы (2)


Первым шагом к получению более предсказуемого решения было бы удаление поведения блокировки (Await.result) и использование mapAsync. Переписывание alreadyExistsFilter потока может быть таким:

  val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(3) { foo ⇒
    val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long]
    foo → Await.result(mysqlDbWrite.run(existsQuery), Duration.Inf)
  }.collect{
    case (foo, res) if res.isDefined ⇒ foo
  }

Более подробную информацию о блокировке в Akka можно найти в документы.

person Stefano Bonetti    schedule 26.09.2017
comment
Хорошо, проблема возникает снова. Я обновляю свой пост выше. - person Knows Not Much; 26.09.2017

Ответ Стефано правильный. Ошибка действительно возникла из-за блокировки кода в потоке.

Хотя моя первоначальная программа работала против scala 2.11, и даже после переключения на mapAsync проблема сохранялась.

Поскольку это инструмент командной строки, мне было легко переключиться на scala 2.12 и попробовать еще раз.

Когда я попробовал Scala 2.12, он работал отлично.

Мне очень помогло наличие "ch.qos.logback" % "logback-classic" % "1.2.3", в зависимостях. Это покажет вам каждый SQL-оператор, который выполняется, и легко поймет, что что-то не так.

person Knows Not Much    schedule 26.09.2017