Как реактивные потоки используются в Slick для вставки данных

В документации Slick представлены примеры использования Reactive Streams только для чтения. data как средство DatabasePublisher. Но что происходит, когда вы хотите использовать свою базу данных в качестве приемника и обратного давления в зависимости от скорости вставки?

Я искал эквивалентный DatabaseSubscriber, но его не существует. Итак, вопрос в том, если у меня есть источник, скажем:

val source = Source(0 to 100)

как я могу создать раковину с помощью Slick, которая записывает эти значения в таблицу со схемой:

create table NumberTable (value INT)


person tonicebrian    schedule 04.04.2016    source источник


Ответы (3)


Серийные вставки

Самый простой способ — сделать вставки внутри Sink.foreach.

Предполагая, что вы использовали генерацию кода схемы и далее таблица называется "NumberTable"

//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow} 

val numberTableDB = Database forConfig "NumberTableConfig"

Мы можем написать функцию, которая выполняет вставку

def insertIntoDb(num : Int) = 
  numberTableDB run (Numbertable += NumbertableRow(num))

И эту функцию можно поместить в Раковину

val insertSink = Sink[Int] foreach insertIntoDb

Source(0 to 100) runWith insertSink

Пакетные вставки

Вы можете дополнительно расширить методологию Sink, группируя N вставок за раз:

def batchInsertIntoDb(nums : Seq[Int]) = 
  numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))

val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb

Этот пакетный приемник может питаться Flow, который группирует пакеты:

val batchSize = 10

Source(0 to 100).via(Flow[Int].grouped(batchSize))
                .runWith(batchInsertSink)
person Ramón J Romero y Vigil    schedule 05.04.2016

Хотя вы можете использовать Sink.foreach для достижения этого (как упоминал Рамон), безопаснее и, вероятно, быстрее (путем параллельного запуска вставок) использовать mapAsync Flow. Проблема, с которой вы столкнетесь при использовании Sink.foreach, заключается в том, что она не имеет возвращаемого значения. Вставка в базу данных с помощью метода db.run сликов возвращает Future, который затем выходит из потоков, возвращаемых Future[Done], что завершается, как только Sink.foreach завершается.

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()

class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") {
  def value = column[Int]("value")
  def * = value
}

val numbers = TableQuery[Numbers]

val db = Database.forConfig("postgres")
Await.result(db.run(numbers.schema.create), Duration.Inf)

val streamFuture: Future[Done] = Source(0 to 100)
  .runWith(Sink.foreach[Int] { (i: Int) =>
    db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done"))
  })
Await.result(streamFuture, Duration.Inf)
println("stream 1 done")

//// sample 1 output: ////
// stream 1 insert 1 done
// ...
// stream 1 insert 99 done
// stream 1 done    <-- stream Future[Done] returned before inserts finished
// stream 1 insert 100 done

С другой стороны, def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T]) Flow позволяет вам запускать вставки параллельно через параметр parallelism и принимает функцию из исходного исходного значения в будущее некоторого типа. Это соответствует нашей функции i => db.run(numbers += i). Самое замечательное в этом Flow то, что он затем передает результат этих Futures вниз по течению.

val streamFuture2: Future[Done] = Source(0 to 100)
  .mapAsync(1) { (i: Int) =>
    db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r }
  }
  .runWith(Sink.ignore)
Await.result(streamFuture2, Duration.Inf)
println("stream 2 done")

//// sample 2 output: ////
// stream 2 insert 1 done
// ...
// stream 2 insert 100 done
// stream 1 done    <-- stream Future[Done] returned after inserts finished

Чтобы доказать это, вы даже можете вернуть реальный результат из потока, а не Future[Done] (с Done, представляющим Unit). Этот поток также добавит более высокое значение параллелизма и пакетной обработки для дополнительной производительности. *

val streamFuture3: Future[Int] = Source(0 to 100)
  .via(Flow[Int].grouped(10)) // Batch in size 10
  .mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count
  .runWith(Sink.fold(0)(_+_)) // count all inserts and return total
val rowsInserted = Await.result(streamFuture3, Duration.Inf)
println(s"stream 3 done, inserted $rowsInserted rows")

// sample 3 output:
// stream 3 done, inserted 101 rows
  • Примечание. Вы, вероятно, не увидите более высокой производительности для такого небольшого набора данных, но когда я имел дело со вставкой 1,7 МБ, я смог добиться наилучшей производительности на своей машине с размером пакета 1000 и значением параллелизма 8, локально с помощью postgresql. Это было примерно в два раза лучше, чем без параллельного выполнения. Как всегда, когда речь идет о производительности, ваши результаты могут отличаться, и вы должны измерять их самостоятельно.
person Chris Balogh    schedule 01.08.2017

Я считаю, что документация Alpakka превосходна, а DSL позволяет очень легко работать с реактивными потоками.

Это документы для Slick: https://doc.akka.io/docs/alpakka/current/slick.html

Пример вставки:

Source(0 to 100)
    .runWith(
      // add an optional first argument to specify the parallelism factor (Int)
      Slick.sink(value => sqlu"INSERT INTO NumberTable VALUES(${value})")
    )
person Sarin Madarasmi    schedule 24.05.2020