scala observable унифицировать наблюдаемые с последовательностью без промежуточного обновления структуры данных

У меня есть код, который вызывает cockbase для получения некоторых строк следующим образом:

val gotValues: Observable[JsonDocument] = Observable.from(rowKeys).flatMap(id =>
      couchbaseBucket.async().get(id))

Если у меня есть 1,2,3,4,5,6 в качестве входных ключей строк, а в БД существуют только строки 1,2,3, тогда наблюдаемые будут получать уведомления только о 1,2,3.

Однако мое требование состоит в том, чтобы я возвращал карту с 1,2,3 истинными (существуют в БД) и 4,5,6 с ложными (что означает отсутствие в БД). Мне удалось сделать это с помощью scala observable, однако я использую структуру данных промежуточной карты, чтобы вернуть общую карту, содержащую все идентификаторы. Ниже приведен пример кода, который имитирует мою проблему.

object Main extends App {
  import rx.lang.scala.Observable

  val idsToFetch = Seq(1,2,3,4,5,6)

  println(isInDBOrNot()) // {1=true, 2=true, 3=true, 4=false, 5=false, 6=false}

  private def isInDBOrNot(): ConcurrentHashMap[Int, Boolean] = {
    val inAndNotInDB = new java.util.concurrent.ConcurrentHashMap[Int, Boolean]
    // - How can I avoid the additional data structure?
    // - In this case a map, so that the function will return
    //   a map with all numbers and for each if exist in DB?
    // - I mean I want the function to return a map I don't 
    //   want to populate that map inside the observer,
    //   it's like a mini side effect I would rather simply 
    //   manipulate the stream.

    Observable.from(idsToFetch)
      .filterNot(x => x == 4 || x == 5 || x == 6) // Simulate fetch from DB, 4,5,6 do not exist in DB, so not returned.
      .subscribe(
      x => inAndNotInDB.put(x, true),
      e => println(e),
      () => idsToFetch.filterNot(inAndNotInDB.containsKey)
        .foreach(inAndNotInDB.put(_, false)) // mark all non-found as false.
    )

    inAndNotInDB
  }

}

В любом случае, чтобы сделать это без промежуточной карты (без заполнения промежуточной структуры данных, а только путем манипулирования потоком)? Он не выглядит чистым!! . Спасибо.


person Jas    schedule 14.11.2017    source источник


Ответы (3)


Ваша проблема, кажется, возникает из-за того, что вы используете flatMap, поэтому, если в БД нет данных для данного id, и вы получаете пустой Observable, flatMap просто не производит вывода для такого id. Итак, похоже, что вам нужен defaultIfEmpty, который переводится в Scala orElse. Вы можете использовать orElse для возврата некоторого значения по умолчанию внутри flatMap. Итак, чтобы изменить ваш пример:

def fetchFromDb(id: Int): Observable[String] = {
  if (id <= 3)
    Observable.just(s"Document #$id")
  else
    Observable.empty
}

def gotValue(idsToFetch: Seq[Int]): Observable[(Int, Boolean)] = {
  Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(_ => (id, true)).orElse((id, false)))
}

println(gotValue(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)

который печатает

Список((1,истина), (2,истина), (3,истина), (4,ложь), (5,ложь), (6,ложь))

Или вы можете использовать Option для возврата Some(JsonDocument) или None, например

def gotValueEx(idsToFetch: Seq[Int]): Observable[(Int, Option[String])] = {
  Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(doc => (id, Option(doc))).orElse((id, None)))
}

println(gotValueEx(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)

который печатает

Список((1,Некоторые(Документ №1)), (2,Некоторые(Документ №2)), (3,Некоторые(Документ №3)), (4,Нет), (5,Нет), (6, Никто))

person SergGr    schedule 22.11.2017

Один из способов сделать это заключается в следующем:

(1) преобразовать последовательность идентификаторов в Observable и map с помощью

id => (id, false)

... так что вы получите наблюдаемую типа Observable[(Int, Boolean)] (давайте назовем эту новую наблюдаемую first).

(2) извлекать данные из базы данных и map каждую извлеченную строку из:

(some_id, true)

... внутри Observable[(Int, Boolean)] (давайте назовем это наблюдаемым last)

(3) concat first и last.

(4) toMap результат (3). Повторяющиеся элементы из first будут удалены в процессе. (это будет ваш resultObsrvable)

(5) (возможно) собрать первый и единственный элемент наблюдаемого (вашей карты). Возможно, вы вообще не хотите этого делать, но если вы это сделаете, вы должны действительно понимать последствия блокировки для сбора результатов на этом этапе. В любом случае, этот шаг действительно зависит от специфики вашего приложения (как организовано threading\scheduling\io), но подход грубой силы должен выглядеть примерно так (см. этот демо для более подробной информации):

Await.result(resultObsrvable.toBlocking.toFuture, 2 seconds)
person Eugene Loy    schedule 17.11.2017

как насчет этого

Observable.from(idsToFetch)
        .filterNot(x => x._1 == 4 || x._1 == 5 || x._1 == 6)
        .foldLeft(idToFetch.map{_->false}.toMap){(m,id)=>m+(id->true)}
person Gary Gan    schedule 20.11.2017