RxJava 2 эквивалентен isUnsubscribed

Я работал с примерами в книге Реактивное программирование с RxJava, предназначенное для версии 1, а не 2. Введение в бесконечные потоки представлено в следующем примере ( и отмечает, что есть лучшие способы борьбы с параллелизмом):

Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
    Runnabler = () -> {
        BigInteger i = ZERO;
        while (!subscriber.isUnsubscribed()) {
            subscriber.onNext(i);
            i = i.add(ONE);
        }
    };
    new Thread(r).start();
});

...

Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();

Однако в RxJava 2 лямбда-выражение, переданное методу create(), имеет тип ObservableEmitter, и у него нет метода isUnsubscribed(). Я просмотрел Чем отличается версия 2.0, а также выполнил поиск в репозитории, но не смог найти такой метод.

Как реализовать ту же функциональность в версии 2.0?

Отредактировано, чтобы включить решение, как указано ниже (примечание с использованием kotlin):

val naturalNumbers = Observable.create<BigInteger> { emitter ->
    Thread({
        var int: BigInteger = BigInteger.ZERO
        while (!emitter.isDisposed) {
            emitter.onNext(int)
            int = int.add(BigInteger.ONE)
        }
    }).start()
}

val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }

Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()

person amb85    schedule 13.06.2017    source источник


Ответы (1)


После подписки на Observable возвращается Disposable. Вы можете сохранить его в свою локальную переменную и проверить disposable.isDisposed(), чтобы узнать, все еще ли он подписывается или нет.

person Tuby    schedule 13.06.2017