Реализация клапана для наблюдаемых потоков, включая буферизацию последнего элемента, испускаемого перед повторным открытием клапана.

Я пытаюсь понять, как реализовать что-то в RxJava (2.0). Это для Android, и я использую Kotlin, хотя выбор платформы и языка здесь не должен иметь значения.

Идея состоит в том, что я бы основал какую-то архитектуру MVP на RxJava. В этой реализации я думаю о том, что Activity (может быть Fragment или пользовательское View) предоставляет поток значений (Booleans для простоты), которые указывают на события жизненного цикла или на то, прикреплено ли представление или отсоединено.

Основная идея в основном такова:

private val lifecycleEvents = PublishSubject.create<Boolean>()
val screenStates: Observable<Boolean> = lifecycleEvents.hide()

override fun onResume() {
    super.onResume()
    lifecycleEvents.onNext(true) // I'm attached!
}

override fun onPause() {
    lifecycleEvents.onNext(false) // I'm detached!
    super.onPause()
}

override fun onDestroy() {
    lifecycleEvents.onComplete() // I'm gone        
    super.onDestroy()
}

А затем с другого конца Presenter выставляет Observable, который представляет собой поток объектов, представляющих состояния экрана, которые должны быть отображены View.

(Это соответствует концепции, описанной в этой серии http://hannesdorfmann.com/android/mosby3-mvi-1, что сводится к тому факту, что Presenter передает представление с автономными объектами, полностью инкапсулирующими состояния экрана, а не с несколькими различными методами представления).

И затем я хотел бы связать эти два наблюдаемых потока, чтобы:

  • Всякий раз, когда View отсоединяется, ввод от Presenter игнорируется (и он не буферизуется, чтобы не сталкиваться с проблемами обратного давления)

  • Однако после повторного подключения представления оно получает последнее состояние, отправленное докладчиком. Другими словами, буферизовать следует не более одного экземпляра состояния.

Это будет работать следующим образом (предполагая, что состояния имеют тип String для простоты):

val merged: Observable<String> = ???

val attached = true
val disattached = false        

screenStates.onNext(attached)
fromPresenter.onNext("state A")
fromPresenter.onNext("state B")

screenStates.onNext(disattached)
fromPresenter.onNext("state C") // this won't survive at the end
fromPresenter.onNext("state D") // this will "override" the previous one.
// as that's the last state from BEFORE the screen is reattached

screenStates.onNext(attached)
// "state D" should be replayed at this point, "state C" is skipped and lost

fromPresenter.onNext("state E")

// what "merged" is supposed to have received at this point:
// "state A", "state B", "state D", "state E"

Я не уверен, какое лучшее идиоматическое решение.

Я пытался реализовать это как ObservableTransformer, но у меня не получилось. Я считаю, что преобразователь должен быть без гражданства, тогда как мое решение тяготело к явному отслеживанию того, что было испущено, и буферизации последнего элемента «вручную» и т. Д., Что кажется грязным и слишком императивным, поэтому я полагаю, что это неправильно.

Я нашел https://github.com/akarnokd/RxJava2Extensions/blob/master/src/main/java/hu/akarnokd/rxjava2/operators/FlowableValve.java, но реализация выглядит очень сложной, и я не могу поверить, что это могло не сделать проще (мне не нужна вся гибкость, я хочу только то, что работает для описанного варианта использования).

Буду признателен за любую информацию, в том числе о том, есть ли что-то еще, что я должен принять во внимание в контексте Android. Также обратите внимание, что я не использую привязки RxKotlin (могу, я просто не предполагал, что они здесь потребуются).

ИЗМЕНИТЬ:

Ниже моя текущая реализация. Как я уже сказал, я не слишком доволен этим, потому что это явное сохранение состояния, и я считаю, что это должно быть достигнуто декларативно, используя некоторые конструкции RxJava.

Мне нужно было объединить два потока разных типов, и поскольку ни combineLatest, ни zip не совсем справлялись, я прибегнул к хитрости, создав общую оболочку для обоих разных типов событий. Это снова вводит определенные накладные расходы.

sealed class Event
class StateEvent(val state: String): Event()
class LifecycleEvent(val attached: Boolean): Event()

class ValveTransformer(val valve: Observable<Boolean>) : ObservableTransformer<String, String> {
    var lastStateEvent: Event? = null
    var lastLifecycleEvent = LifecycleEvent(false)

    private fun buffer(event: StateEvent) {
        lastStateEvent = event
    }

    private fun buffer(event: LifecycleEvent) {
        lastLifecycleEvent = event
    }

    private fun popLastState(): String {
        val bufferedState = (lastStateEvent as StateEvent).state
        lastStateEvent = null
        return bufferedState
    }

    override fun apply(upstream: Observable<String>): ObservableSource<String> = Observable
            .merge(
                    upstream.map(::StateEvent).doOnNext { buffer(it) }, 
                    valve.distinctUntilChanged().map(::LifecycleEvent).doOnNext { buffer (it) })
            .switchMap { when {
                it is LifecycleEvent && it.attached && lastStateEvent != null ->
                    // the screen is attached now, pump the pending state out of the buffer
                    just(popLastState())
                it is StateEvent && lastLifecycleEvent.attached -> just(it.state)
                else -> empty<String>()
            } }
}

person Konrad Morawski    schedule 18.06.2017    source источник


Ответы (2)


Чтобы объединить ответ @TpoM6oH с исходным предложением:

val bufferedEvent: Observable<Event> = BehaviorSubject.create()
bufferedEventResult = valve.switchMap( 
     viewEvent -> if (viewEvent) 
                       bufferedEvent 
                  else Observable.never() )

Оператор switchMap() отвечает за подписку и отмену подписки.

Затем вы можете разделить полученную наблюдаемую на необходимые состояния и события, используя publish(). Я не уверен, зачем нужен ObservableTransformer.

person Bob Dalgleish    schedule 19.06.2017
comment
Спасибо, Боб! Я не могу попробовать это в данный момент (слишком занят на работе, и это только для моего побочного проекта), но я займусь этим. Я проголосовал за ваш ответ и приму его, как только проверю решение. - person Konrad Morawski; 21.06.2017
comment
Честно говоря, я не уверен, как этот код вписать в мой ObservableTransformer... И зачем буферизовать оба? Мне нужно только буферизовать состояния, а не события жизненного цикла - person Konrad Morawski; 01.07.2017
comment
Хорошо, я буферизирую события жизненного цикла. Все еще не уверен, как его включить в {{ObservableTransformer}}. Например. как и где я могу подписаться {{bufferedState}} на входящий поток (который подлежит преобразованию)? Как и где я буду отписываться потом? - person Konrad Morawski; 01.07.2017
comment
В моем примере есть два отдельных переключателя, так как я не понял, что вам нужно их объединить. Я отредактирую свой пример. - person Bob Dalgleish; 01.07.2017
comment
Спасибо. Идея и цель трансформера — решить проблему жизненного цикла в MVP/Android. Представление можно прикреплять и повторно прикреплять (на Android тот же экран фактически повторно инициализируется, например, при повороте устройства), и Presenter должен воздерживаться от подачи в представление состояний экрана до тех пор, пока оно не будет повторно подключено. Часто это делается путем реализации методов жизненного цикла в самом Presenter и подписки/отмены подписки на View, когда это происходит. Я бы хотел, чтобы View вместо этого испускал поток (скажем) логических значений, выступая в качестве клапана, и чтобы это решалось на уровне одного потока без отказа от подписки. - person Konrad Morawski; 02.07.2017

Мне кажется, что вы ищете BehaviorSubject - это субъект, который отправляет самый последний элемент, который он наблюдал, и все последующие наблюдаемые элементы каждому подписанному наблюдателю.

Если вы используете его в презентере, отмените подписку на него, когда представление отсоединено, и подпишитесь на него, когда представление будет прикреплено, вы должны получить то, что хотите.

person TpoM6oH    schedule 18.06.2017
comment
Спасибо за ваш ответ. Я знаю о BehaviorSubject и о возможности его реализации таким образом. Но мне было интересно, достижимо ли это без отказа от подписки и повторной подписки. Я имею в виду, я знаю, что в конце концов мы должны отказаться от подписки, но я хотел бы сделать это только один раз, когда представление будет уничтожено, а не каждый раз, когда оно будет приостановлено. - person Konrad Morawski; 18.06.2017
comment
Кажется сложным сделать это полностью без отказа от подписки, но вы можете перенести эту логику на ведущего, имея тему поведения для всех ваших событий просмотра и другую тему, которая будет открыта для просмотра, отписывая открытую тему от темы поведения, когда вы получаете detach событие и подписка, когда вы получите attached запрос. Затем вы можете переместить это в базовый класс вашего докладчика, и это будет выглядеть довольно хорошо. - person TpoM6oH; 18.06.2017
comment
это тоже возможно, я думаю. В любом случае, я отредактировал вопрос, вставив свою текущую реализацию - настолько хорошо, насколько мне удалось придумать. Очевидно, здесь пригодится выразительность Kotlin — уродство эквивалентной реализации Java было бы гораздо более очевидным. - person Konrad Morawski; 18.06.2017