Как обрабатывать удаление в RxJava без InterruptedException

В приведенном ниже коде, вырезанном при вызове dispose(), поток эмиттера прерывается (InterruptedException выходит из режима сна).

    Observable<Integer> obs = Observable.create(emitter -> {
        for (int i = 0; i < 10; i++) {
            if (emitter.isDisposed()) {
                System.out.println("> exiting.");
                emitter.onComplete();
                return;
            }

            emitter.onNext(i);
            System.out.println("> calculation = " + i);


            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        emitter.onComplete();
    });

    Disposable disposable = obs
            .subscribeOn(Schedulers.computation())
            .subscribe(System.out::println);

    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    disposable.dispose();

Из сеанса отладки я вижу, что прерывание исходит от FutureTask, которое отменяется во время удаления. Там поток, вызывающий dispose(), проверяется на соответствие потоку-исполнителю, и если он не совпадает, эмиттер прерывается. Поток отличается, поскольку я использовал вычисление Scheduler.

Есть ли способ заставить dispose не прерывать такой эмиттер, или как это всегда нужно обрабатывать? Проблема, которую я вижу с этим подходом, заключается в том, что у меня была бы прерываемая операция (смоделированная здесь с помощью сна), которую я хотел бы завершить в обычном режиме перед вызовом onComplete().


person byyyk    schedule 30.05.2019    source источник


Ответы (1)


См. Что изменилось в версии 2.0 - Обработка ошибок .

Одним из важных требований к дизайну для 2.x является недопустимость ошибок Throwable. Это означает, что ошибки не могут быть сгенерированы, потому что жизненный цикл нижестоящего уже достиг своего конечного состояния или нижестоящий отменил последовательность, которая собиралась выдать ошибку.

Таким образом, вы можете либо обернуть все внутри try / catch, либо правильно выдать ошибку:

Observable<Integer> obs = Observable.create(emitter -> {
   try {
      // ...
   } catch (InterruptedException ex) {
      // check if the interrupt is due to cancellation
      // if so, no need to signal the InterruptedException
      if (!disposable.isDisposed()) {
         observer.onError(ex);
      }
   }
});

или настройте потребителя глобальной ошибки, чтобы игнорировать ее:

RxJavaPlugins.setErrorHandler(e -> {
    // ..
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    // ...
    Log.warning("Undeliverable exception received, not sure what to do", e);
});
person Gustavo    schedule 30.05.2019