Скала Будущее.найти

В Scala 2.12 есть 2 метода Future.find.

@deprecated("use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12.0")
def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]]

И его перегруженная версия

def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]]

Оба имеют одинаковое описание

  /** Asynchronously and non-blockingly returns a `Future` that will hold the optional result
   *  of the first `Future` with a result that matches the predicate, failed `Future`s will be ignored.
   *
   * @tparam T        the type of the value in the future
   * @param futures   the `scala.collection.immutable.Iterable` of Futures to search
   * @param p         the predicate which indicates if it's a match
   * @return          the `Future` holding the optional result of the search
   */

Итак, я предполагаю, что эти методы находят первый завершенный Future, который соответствует параметру p в данном списке.

Но на самом деле это делает только первый.

  val start = System.currentTimeMillis
  val a = (1 to 3).reverse.iterator.map{ x =>
    Future{
      Thread.sleep(x * 10000)
      x
    }
  }
  val b = Future.find(a)(_.isInstanceOf[Int])
  b.foreach{ x =>
    println(x)
    println(System.currentTimeMillis - start) // 10020 
  }

Устаревшая версия метода возвращает самую быструю.

  val a = (1 to 3).reverse.map{ x =>
    Future{
      Thread.sleep(x * 10000)
      x
    }
  }
  val b = Future.find(a)(_.isInstanceOf[Int])
  b.foreach{ x =>
    println(x)
    println(System.currentTimeMillis - start)
  }

Перегруженная версия возвращает самую медленную. Чтобы быть более точным, он просто проверяет заданный список от начала до конца, и ему все равно, сколько времени потребуется для их завершения.

Это как должно быть? Если да, то использует ли дублированный или реализует его сам единственный вариант, чтобы заботиться о том, чтобы их время было завершено?


person suish    schedule 13.12.2018    source источник
comment
Что вы подразумеваете под самым медленным или самым быстрым? Я не вижу, чтобы вы где-нибудь отслеживали индивидуальное будущее завершение.   -  person sarveshseri    schedule 13.12.2018
comment
Кроме того, во втором примере, поскольку a является iterator; он будет использован val c = Future.firstCompletedOf(a), а val b = Future.find(a)(_.isInstanceOf[Int]) получит пустой iterator.   -  person sarveshseri    schedule 13.12.2018
comment
@SarveshKumarSingh удалил строку. Это была какая-то ошибка копирования и вставки   -  person suish    schedule 17.12.2018


Ответы (1)


Вы правы в том, что устаревший Future.find, который ожидает TraversableOnce[Future[T]] в 2.12.x, ведет себя иначе, чем заменяющий Future.find. Как вы можете видеть из вставленного исходного кода ниже, первый метод find использует Promise с tryComplete для эффективного захвата первого завершенного будущего из входной коллекции, тогда как последний использует простой обход hasNext/next:

@deprecated("use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12.0")
def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
  val futuresBuffer = futures.toBuffer
  if (futuresBuffer.isEmpty) successful[Option[T]](None)
  else {
    val result = Promise[Option[T]]()
    val ref = new AtomicInteger(futuresBuffer.size)
    val search: Try[T] => Unit = v => try {
      v match {
        case Success(r) if p(r) => result tryComplete Success(Some(r))
        case _ =>
      }
    } finally {
      if (ref.decrementAndGet == 0) {
        result tryComplete Success(None)
      }
    }

    futuresBuffer.foreach(_ onComplete search)

    result.future
  }
}

def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
  def searchNext(i: Iterator[Future[T]]): Future[Option[T]] =
    if (!i.hasNext) successful[Option[T]](None)
    else {
      i.next().transformWith {
        case Success(r) if p(r) => successful(Some(r))
        case other => searchNext(i)
      }
    }
  searchNext(futures.iterator)
}

Одним из подходов к реализации собственного может быть расширение метода Future.firstCompletedOf с добавленным предикатом во что-то вроде следующего:

def firstConditionallyCompletedOf[T](futures: List[Future[T]])(p: T => Boolean)(implicit ec: ExecutionContext): Future[T] = {
  val p = Promise[T]()
  val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
    override def apply(v1: Try[T]): Unit = getAndSet(null) match {
      case null => ()
      case some => some tryComplete v1
    }
  }
  futures.foreach{ _.filter(condition).onComplete(firstCompleteHandler) }
  p.future
}
person Leo C    schedule 13.12.2018