Как использовать бесплатную монаду с Future[M[_]]

Я реализовал простой язык для процесса ETL, используя бесплатную монаду. При использовании List в качестве ввода и вывода как для выборки, так и для хранения данных все работает нормально. Однако я использую асинхронные библиотеки и работаю с Future[List]

общий импорт и определения

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import cats.free.Free
import cats.free.Free._

sealed trait Ops[A]
type OpsF[A] = Free[Ops, A]

работаю с List

case class Fetch(offset: Int, amount: Int) extends Ops[List[Record]]
case class Store(recs: List[Record]) extends Ops[List[Response]]

def fetch(offset: Int, amount: Int): OpsF[List[Record]] = 
    liftF[Ops, List[Record]](Fetch(offset, amount))
def store(recs: List[Record]): OpsF[List[Response]] = 
    liftF[Ops, List[Response]](Store(recs))

def simpleEtl(offset: Int, amount: Int): Free[Ops, List[Response]]  = 
    fetch(offset, amount).flatMap(r => store(r))

не работает с Future[List]

case class Fetch(offset: Int, amount: Int) extends Ops[Future[List[Record]]]
case class Store(recs: List[Record]) extends Ops[Future[List[Response]]]

def fetch(offset: Int, amount: Int): OpsF[Future[List[Record]]] = 
    liftF[Ops, Future[List[Record]]](Fetch(offset, amount))
def store(recs: List[Record]): OpsF[Future[List[Response]]] = 
    liftF[Ops, Future[List[Response]]](Store(recs))

// explicit types in case I am misunderstanding more than I think
def simpleEtl(offset: Int, amount: Int): Free[Ops, Future[List[Response]]] = 
fetch(offset, amount).flatMap { rf: Future[List[Record]] =>
  val getResponses: OpsF[Future[List[Response]]] = rf map { r: List[Record] =>
    store(r)
  }
  getResponses
}

как и ожидалось, тип, возвращаемый из flatMap/map, неверен - я получаю не OpsF[Future], а Future[OpsF]

Error:(34, 60) type mismatch;
 found   :  scala.concurrent.Future[OpsF[scala.concurrent.Future[List[Response]]]]
(which expands to)  scala.concurrent.Future[cats.free.Free[Ops,scala.concurrent.Future[List[String]]]]
 required: OpsF[scala.concurrent.Future[List[Response]]]
(which expands to)  cats.free.Free[Ops,scala.concurrent.Future[List[String]]]
    val getResponses: OpsF[Future[List[Response]]] = rf map { r: List[Record] =>

мой текущий обходной путь состоит в том, чтобы store принять Future[List[Record]] и позволить интерпретатору отображать Future, но это кажется неуклюжим.

Проблема не относится к List - например. Option тоже было бы полезно.

Я делаю это неправильно? Есть ли для этого какой-то монадный преобразователь?


person kostja    schedule 13.06.2016    source источник
comment
Это похоже на типичный шаблон для преобразования монады, на первый взгляд кажется, что Haskell каким-то образом имеет FreeT, хотя не смог найти его в scalaz или cat.   -  person Ende Neu    schedule 13.06.2016
comment
scalaz имеет FreeT начиная с 7.2.0.   -  person Peter Neyens    schedule 13.06.2016
comment
Могу ли я указать вам на библиотеку 47 градусов с метким названием 47deg.github.io/fetch, которая скоро станет инкубатор на уровне типов? Имейте в виду, я не работаю на 47 градусов, но похоже, что у этого уже есть решение для большей части того, что вы хотите сделать.   -  person wheaties    schedule 13.06.2016
comment
да, выборка, кажется, движется в правильном направлении, но сейчас мне кажется, что мне нужно будет сильно настроить ее для моих нужд, таких как пакетная обработка. Я определенно буду следить за ним, пока он созревает.   -  person kostja    schedule 14.06.2016


Ответы (1)


Абстрактный тип данных Ops определяет алгебру для выборки и сохранения нескольких Record. Он описывает две операции, но это единственное, что должна делать алгебра. То, как на самом деле выполняются операции, не должно иметь никакого значения для Fetch и Store, единственная полезная вещь, которую вы ожидаете, это соответственно List[Record] и List[Response].

Делая ожидаемый тип результата Fetch и Store Future[List[Record]]], вы ограничиваете возможности интерпретации этой алгебры. Возможно, в ваших тестах вы не хотите асинхронно подключаться к веб-сервису или базе данных и просто хотите протестировать с помощью Map[Int, Result] или Vector[Result], но теперь вам необходимо вернуть Future, что делает тесты более сложными, чем они могли бы быть.

Но сказать, что вам не нужен ETL[Future[List[Record]]], не решить ваш вопрос: вы используете асинхронные библиотеки и, вероятно, хотите вернуть некоторые Future.

Начиная с вашей первой реализации:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import cats.implicits._
import cats.free.Free

type Record = String
type Response = String

sealed trait EtlOp[T]
case class Fetch(offset: Int, amount: Int) extends EtlOp[List[Record]]
case class Store(recs: List[Record]) extends EtlOp[List[Response]]

type ETL[A] = Free[EtlOp, A]

def fetch(offset: Int, amount: Int): ETL[List[Record]] = 
  Free.liftF(Fetch(offset, amount))
def store(recs: List[Record]): ETL[List[Response]] = 
  Free.liftF(Store(recs))

def fetchStore(offset: Int, amount: Int): ETL[List[Response]] =
  fetch(offset, amount).flatMap(store)

Но теперь у нас все еще нет Futures ? Это работа нашего переводчика:

import cats.~>

val interpretFutureDumb: EtlOp ~> Future = new (EtlOp ~> Future) {
  def apply[A](op: EtlOp[A]): Future[A] = op match {
    case Store(records) => 
      Future.successful(records.map(rec => s"Resp($rec)"))
      // store in DB, send to webservice, ...
    case Fetch(offset, amount) =>
      Future.successful(List.fill(amount)(offset.toString))
      // get from DB, from webservice, ...
  }
}

С помощью этого интерпретатора (где, конечно, вы бы заменили Future.successful(...) чем-то более полезным) мы можем получить наш Future[List[Response]] :

val responses: Future[List[Response]] = 
  fetchStore(1, 5).foldMap(interpretFutureDumb)

val records: Future[List[Record]] = 
  fetch(2, 4).foldMap(interpretFutureDumb)

responses.foreach(println)
// List(Resp(1), Resp(1), Resp(1), Resp(1), Resp(1))
records.foreach(println)
// List(2, 2, 2, 2)

Но мы все еще можем создать другой интерпретатор, который не возвращает Future :

import scala.collection.mutable.ListBuffer
import cats.Id

val interpretSync: EtlOp ~> Id = new (EtlOp ~> Id) {
  val records: ListBuffer[Record] = ListBuffer()
  def apply[A](op: EtlOp[A]): Id[A] = op match {
    case Store(recs) => 
      records ++= recs
      records.toList
    case Fetch(offset, amount) =>
      records.drop(offset).take(amount).toList
  }
}

val etlResponse: ETL[List[Response]] =
  for { 
    _       <- store(List("a", "b", "c", "d"))
    records <- fetch(1, 2)
    resp    <- store(records)
  } yield resp

val responses2: List[Response] = etlResponse.foldMap(interpretSync)
// List(a, b, c, d, b, c)
person Peter Neyens    schedule 13.06.2016
comment
Ах, я вижу, имеет смысл. Похоже, я концептуально перепутал определения подъема с выполнением интерпретатором. Спасибо, Питер. - person kostja; 14.06.2016
comment
@peter-neyens Когда мы объединяем алгебру, можем ли мы объединить два интерпретатора - один возвращает Id, а другой возвращает Future? - person arjunswaj; 02.09.2017
comment
Я не уверен, каким образом вы хотите объединить двух переводчиков. Вы всегда можете интерпретировать программу дважды, один раз для Id и один раз для Future. - person Peter Neyens; 04.09.2017
comment
Объяснил как Бог! - person Alex Fruzenshtein; 10.03.2019