RxKotlin — динамический массив наблюдателей

Я использую RxKotlin вместе с Retrofit 2.

Я пытаюсь понять, как иметь динамический список наблюдателей за одну операцию.

Первый наблюдатель должен инициировать операцию, а все дополнительные наблюдатели должны дождаться завершения или сбоя операции.

После завершения операции мне нужно выполнить манипуляцию с данными (сохранить в кеше/памяти), а затем уведомить всех наблюдателей.

Вот что я сделал:

class UserManager
{
    val observers = ArrayList<Observer<ArrayList<User>>>()
    var isFetchingUsers = false

    fun getUsers(observer: Observer<ArrayList<User>>)
    {
        observers.add(observer)

        if (isFetchingUsers)
        {
            return
        }

        api.getUserList.observeOn(AndroidSchedulers.mainThread()).subscribe(object : Observer<UserListResponse>
        {
            override fun onNext(response: UserListResponse)
            {
                // Do some manipulations on the response and notify all

                observers.forEach {
                    it.onNext(response.getUsers())
                }
            }

            override fun onError(e: Throwable)
            {
                observers.forEach {
                    it.onError(Throwable())
                }
            }

            override fun onComplete()
            {
                isFetchingUsers = false
                observers.clear()
            }

            override fun onSubscribe(d: Disposable)
            {
            }
        })
    }
}

Вот наблюдаемое создание Retrofit (это на Java..)

   /**
     * Get users
     */
    public Observable<UserListResponse> getUserList()
    {
        return mService.getUserList().subscribeOn(Schedulers.io());
    }

Я уверен, что есть лучший способ сделать это

Спасибо!


person dor506    schedule 09.11.2017    source источник


Ответы (1)


Вы можете использовать оператор share() для наблюдаемого. Только первая подписка заставит наблюдаемый выполнить процесс создания. Как только последний подписчик отпишется, наблюдаемый объект выполнит процесс уничтожения.

Observable<Long> v;
...
Observable<Long> sharedObservable = v
  .doOnSubscribe( () -> logger.debug("subscribe") )
  .doOnUnsubscribe( () -> logger.debug("unsubscribe") )
  .share();

...
Subscription v1 = sharedObservable.subscribe();
Subscription v2 = sharedObservable.subscribe();
...
v1.unsubscribe();
v2.unsubscribe();

Вы увидите, что операция подписки происходит только один раз. Вы должны увидеть, что с исходным наблюдаемым происходит только одна операция подписки и только одна отмена подписки.

person Bob Dalgleish    schedule 09.11.2017