RxScala: как сохранить поток, выполняющий Observable.interval?

Я пытаюсь написать простую программу RxScala:

import rx.lang.scala.Observable

import scala.concurrent.duration.DurationInt
import scala.language.{implicitConversions, postfixOps}

object Main {
  def main(args: Array[String]): Unit = {
    val o = Observable.interval(1 second)
    o.subscribe(println(_))
  }
}

Когда я запускаю эту программу, я не вижу ничего напечатанного. Я подозреваю, что это связано с тем, что поток, производящий числа в Observable.interval, умирает. Я заметил вызов waitFor(o) в RxScalaDemo, но я не могу понять, откуда это импортировано.

Как заставить эту программу постоянно печатать числовую последовательность?


person Binil Thomas    schedule 03.04.2015    source источник


Ответы (2)


Вы ничего не видите, потому что ваш метод main завершает работу сразу после подписки на метод Observable. На этом ваша программа готова.

Обычный трюк для тестовых программ, подобных этой, заключается в чтении байта из стандартного ввода после подписки.

person Rob Worsnop    schedule 03.04.2015

Вот один из способов заблокировать выход основного потока:

val o = Observable.interval(1 second)
val latch = new CountDownLatch(1)
o.subscribe(i => {
  print(i)
  if (i >= 5) latch.countDown()

})
latch.await()

Это довольно распространенный шаблон, используйте CountDownLatch.await, чтобы заблокировать основной поток, а затем отсчитайте защелку, когда вы закончите то, что делаете, тем самым освободив основной поток.

person Biju Kunjummen    schedule 03.04.2015