Отслеживание [rx] наблюдаемых исключений в больших графах до исходного кода

Когда у вас есть большой наблюдаемый граф (т. е. наблюдаемый, составленный много раз с использованием merge, groupBy, join и т. д.), и возникает исключение, иногда трудно понять, откуда возникло исключение. Я хотел бы знать, можно ли узнать, где в исходном файле были вызваны операторы Observable. Пример должен прояснить это.

Например, учитывая следующие IllegalStateException: Only one subscriber allowed! и трассировку стека, я хотел бы знать, можно ли узнать, из каких строк с номерами operatorMerge,operatorFilter,operatorGroupBy и т. д. были вызваны мои исходные файлы. Так ли это? можно ли это как-то сделать, используя отладчик, операторы печати или иным образом?

java.lang.IllegalStateException: Only one subscriber allowed!
        at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilS
ubscriber.java:124)
        at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilS
ubscriber.java:81)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable.unsafeSubscribe(Observable.java:7531)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy.
java:251)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy.
java:236)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable.unsafeSubscribe(Observable.java:7531)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMer
ge.java:215)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1
85)
        at rx.internal.operators.**OperatorMerge**$MergeSubscriber.onNext(OperatorMerge.java:1
20)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.SingleDelayedProducer.emit(SingleDelayedProducer.java:80)

        at rx.internal.operators.SingleDelayedProducer.set(SingleDelayedProducer.java:63)
        at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservab
leList.java:93)
        at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:44)
        at rx.internal.operators.**OperatorFilter**$1.onCompleted(OperatorFilter.java:42)
        at rx.internal.operators.OperatorTakeUntilPredicate$ParentSubscriber.onNext(Operat
orTakeUntilPredicate.java:54)
        at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
        at rx.internal.operators.**OperatorGroupBy**$GroupBySubscriber$2$2.onNext(OperatorGrou
pBy.java:286)
        at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:1
81)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupB
y.java:340)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.
java:226)
        at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:124
)
        at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublis
h.java:560)
        at rx.internal.operators.**OperatorPublish**$PublishSubscriber.onNext(OperatorPublish.
java:258)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscr
ibeFromIterable.java:98)
        at rx.Subscriber.setProducer(Subscriber.java:177)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java
:50)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java
:33)
        at rx.Observable.unsafeSubscribe(Observable.java:7531)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMer
ge.java:215)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1
85)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1
20)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:124
)
        at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublis
h.java:560)
        at rx.internal.operators.OperatorPublish$PublishSubscriber.onNext(OperatorPublish.
java:258)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:112)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2$2.onNext(OperatorGrou
pBy.java:286)
        at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:1
81)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupB
y.java:340)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.
java:226)
        at rx.lang.scala.Subscriber$$anon$3.onNext(Subscriber.scala:198)
...

Эта проблема возникает в основном из-за того, что весь смысл Observable состоит в том, чтобы отделить а) код от б) при его выполнении. Но для отладки программы это кошмар. Итак, повторяя свой вопрос выше, я хотел бы знать, можно ли отследить каждую композицию до ее исходной строки в исходном коде.


person Luciano    schedule 03.11.2015    source источник


Ответы (2)


Были некоторые эксперименты с дополнительной отладочной информацией, но вся библиотека работала в 100 раз медленнее и была заброшена.

Проблема, вероятно, заключается в том, что flatMap следует за вашим groupBy, где вы подписываетесь на GroupedObservable, а также передаете его обратно в flatMap, который теперь не может на него подписаться: GroupedObservable можно использовать только один раз. Вам нужно использовать один из операторов publish() или replay() и соответствующим образом настроить логику функции.

person akarnokd    schedule 03.11.2015
comment
Спасибо за ваши комментарии; это действительно было исправлено путем shareing как groupBy, так и вложенных наблюдаемых. - person Luciano; 05.11.2015

Год спустя я все еще борюсь с этим и до сих пор не нашел хорошего способа отслеживать казни. Я обнаружил, что полагаюсь на размещение операторов печати в коде, чтобы увидеть, что происходит. Только так я могу получить следы происходящего.

Единственное, что мне показалось полезным, — это создать шаблон для этого, поэтому мне не нужно писать doOnNext(x => println(x)), чтобы показать, что происходит каждый раз:

  implicit class ObservableTrace[T](o : rx.lang.scala.Observable[T]) {
    import java.time.LocalTime
    def trace(name : String) : rx.lang.scala.Observable[T] = {
      def print(s: String) = println(s"${LocalTime.now} : $name : $s")
      (o doOnNext (x => print("next:" + x))
        doOnSubscribe print("subscribed")
        doOnCompleted print("completed")
        doOnError (e => print("error: " + e))
        doOnUnsubscribe print("unsubscribed")
        )
    }

Это делает редактирование кода быстрым — просто напишите myobservable.trace("My Observable") на паре ваших наблюдаемых, и это позволит легко увидеть, когда происходят различные события жизненного цикла.

person Luciano    schedule 22.11.2016