RxJava (Kotlin), Observable.amb и PublishSubject не запускаются

Мы пытаемся соблюдать либо 15-секундный интервал, либо каждый раз, когда мы стреляем onNext по нашему объекту refreshEventsSubject, но безуспешно.

Тема инициирована так

private val refreshEventsSubject = PublishSubject<Long>()

А потом мы пытаемся это наблюдать вот так

Observable.merge(Observable.interval(0, 15, TimeUnit.SECONDS), refreshEventsSubject) .subscribe { ... }

Мы получаем события из интервала каждые 15 секунд, но объект не стреляет после бега.

refreshEventsSubject.onNext(0)

Любые идеи приветствуются.

(Все написано на Котлине)


person Hannes Lohmander    schedule 16.11.2016    source источник
comment
В RxJava вы создаете PublishSubject через PublishSubject.create() статический метод, потому что создание его через конструктор без параметров не работает. Я не знаю, компенсирует это RxKotlin или нет.   -  person akarnokd    schedule 16.11.2016
comment
Да, привязка Kotlin rx для PublishSubject выглядит так fun <T> PublishSubject() : PublishSubject<T> = PublishSubject.create()   -  person Hannes Lohmander    schedule 16.11.2016


Ответы (2)


Убедитесь, что refreshEventsSubject.onNext(0) не вызывается из вашего основного потока, так как это может вызвать взаимоблокировки!

Также используйте http://reactivex.io/documentation/operators/amb.html. чем слияние, поскольку слияние вызовет два события против вызова onNext по вашей теме.

person Richard    schedule 17.11.2016

Прочтите документацию по AMB: http://reactivex.io/documentation/operators/amb.html < / а>

В частности emit all of the items from only the first of these Observables to emit an item or notification.

Оператор, который вы ищете, вероятно, Observable.merge: http://reactivex.io/documentation/operators/merge.html

person Kiskae    schedule 16.11.2016
comment
Правда, это все еще не работает. Если мы подписываемся непосредственно на тему, мы получаем событие, но при объединении / слиянии с интервальным потоком ничего не происходит. - person Hannes Lohmander; 16.11.2016