Как писать документы в MongoDB, используя транзакции ReactiveMongo?

Я пытаюсь запустить этот простой код, который вставляет документ в коллекцию «отчетов» по ​​транзакции.

Это код:

def runWithTransaction(): Future[Unit] = {
    for {
      dbWithSession <- db.db.startSession()
      dbWithTx <- dbWithSession.startTransaction(None)

      reportsCollection = dbWithTx.collection[JSONCollection]("reports")
      _ <- reportsCollection.insert.one(BSONDocument("id" -> "123123"))

      _ <- dbWithTx.commitTransaction()
      _ <- dbWithSession.endSession()
    } yield ()
  }

Я получаю следующую ошибку:

play.api.libs.json.JsResultException: JsResultException(errors:List((,List(JsonValidationError(List(CommandError[code=14, errmsg=BSON field 'OperationSessionInfo.txnNumber' is the wrong type 'int', expected type 'long', doc: {"operationTime":{"$time":1573135213,"$i":1,"$timestamp":{"t":1573135213,"i":1}},"ok":0,"errmsg":"BSON field 'OperationSessionInfo.txnNumber' is the wrong type 'int', expected type 'long'","code":14,"codeName":"TypeMismatch","$clusterTime":{"clusterTime":{"$time":1573135213,"$i":1,"$timestamp":{"t":1573135213,"i":1}},"signature":{"hash":{"$binary":"0000000000000000000000000000000000000000","$type":"00"},"keyId":0}}}]),WrappedArray())))))
    at reactivemongo.play.json.JSONSerializationPack$.deserialize(JSONSerializationPack.scala:61)
    at reactivemongo.play.json.JSONSerializationPack$.deserialize(JSONSerializationPack.scala:33)
    at reactivemongo.api.commands.Command$$anon$2.$anonfun$one$6(commands.scala:141)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:92)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:92)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Я использую:

  • скала 2.12
  • "org.reactivemongo" % "play2-reactivemongo_2.12" % "0.18.8-play27"
  • MongoDB версии 4.2.1 (на самом деле это набор реплик с 1 основным)

Любая помощь приветствуется, спасибо.


person Ameen    schedule 07.11.2019    source источник
comment
Зачем использовать JSONCollection с документом BSON?   -  person cchantep    schedule 07.11.2019
comment
Для тех, кто сталкивается с той же проблемой, ответ (для меня) находится в комментарии выше (т.е. используйте BSONCollection вместо JSONCollection). Не уверен, почему транзакции взрываются с трассировкой стека, показанной в вопросе OP, при использовании JSONCollection, но переключение на BSONCollection решило эту проблему.   -  person virtualeyes    schedule 22.11.2020
comment
@virtualeyes правильно, вот как я это исправил, спасибо.   -  person Ameen    schedule 22.11.2020


Ответы (1)


Используйте этот код для вставки:

def insert(coll: BSONCollection, document: BSONDocument): Future[Unit] = {
    val writeRes: Future[WriteResult] = coll.insert.one(document)
    writeRes.onComplete { // Dummy callbacks
      case Failure(e) => e.printStackTrace()
      case Success(writeResult) =>
        println(s"successfully inserted document with result: $writeResult")
    }
    writeRes.map(_ => {}) // in this example, do nothing with the success
  }

И если вы хотите обновить, используйте это:

def update(collection: BSONCollection, item_id: Long, modifier: BSONDocument) = {
    val selector = BSONDocument("_id" -> item_id)
    println("selector ===" + selector)

    val futureUpdate = collection.update.one(
      q = selector, u = modifier,
      upsert = true, multi = false)

    futureUpdate.onComplete { // Dummy callbacks
      case Failure(e) => e.printStackTrace()
      case Success(writeResult) =>
        println(s"successfully updated document with result: $writeResult")
    }

  }
person swapnil shashank    schedule 25.06.2020