В приведенном ниже коде, вырезанном при вызове dispose()
, поток эмиттера прерывается (InterruptedException
выходит из режима сна).
Observable<Integer> obs = Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
if (emitter.isDisposed()) {
System.out.println("> exiting.");
emitter.onComplete();
return;
}
emitter.onNext(i);
System.out.println("> calculation = " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
emitter.onComplete();
});
Disposable disposable = obs
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable.dispose();
Из сеанса отладки я вижу, что прерывание исходит от FutureTask
, которое отменяется во время удаления. Там поток, вызывающий dispose()
, проверяется на соответствие потоку-исполнителю, и если он не совпадает, эмиттер прерывается. Поток отличается, поскольку я использовал вычисление Scheduler
.
Есть ли способ заставить dispose не прерывать такой эмиттер, или как это всегда нужно обрабатывать? Проблема, которую я вижу с этим подходом, заключается в том, что у меня была бы прерываемая операция (смоделированная здесь с помощью сна), которую я хотел бы завершить в обычном режиме перед вызовом onComplete()
.