Сбрасываемый шаблон Single Rx

У меня есть следующий дизайн, который я хотел бы создать, но я не уверен, какой шаблон Rx ему соответствует. Цель более-менее похожа на сингл, но с условной проверкой.

  • Есть один Observable<String>, и возможно любое количество наблюдателей.
  • Если запрос сделан впервые, наблюдаемый выполнит некоторый сетевой запрос, приняв строку, а затем выдаст обратный вызов (во многом похожий на завершаемый/одиночный)
  • Любой последующий вызов с тем же ключом немедленно вернет тот же результат.
  • Однако, если прошло 5 минут и был сделан тот же вызов, мы повторно получим данные, так как срок их действия истек, а затем отправим их любым слушателям. Этот результат сохраняется еще 5 минут, и цикл повторяется.
  • Все данные сохраняются на основе отправленного ключа, как в шаблоне легковеса. Срок действия зависит от времени последнего запроса конкретного ключа.

Моя первоначальная мысль состояла в том, чтобы просто создать свой собственный класс с параллельными хэш-картами. Однако это будет означать, что мне придется самому обрабатывать множество механизмов потоковой передачи. Я чувствую, что RxJava будет отличным решением для этого, но я не уверен, что такие шаблоны существуют. У кого-нибудь есть идея?

Я понимаю, что целью Single<T> является получение только одного ответа, поэтому мои условия могут быть неверными.

Ниже приведена моя попытка, которую я буду обновлять по ходу дела.

/**
 * Created by Allan Wang on 07/01/18.
 *
 * Reactive flyweight to help deal with prolonged executions
 * Each call will output a [Single], which may be new if none exist or the old one is invalidated,
 * or reused if an old one is still valid
 *
 * Types:
 * T    input       argument for caller
 * C    condition   condition to check against for validity
 * R    response    response within reactive output
 */
abstract class RxFlyweight<in T : Any, C : Any, R : Any> {

    /**
     * Given an input emit the desired response
     * This will be executed in a separate thread
     */
    protected abstract fun call(input: T): R

    /**
     * Given an input and condition, check if
     * we may used cache data or if we need to make a new request
     * Return [true] to use cache, [false] otherwise
     */
    protected abstract fun validate(input: T, cond: C): Boolean

    /**
     * Given an input, create a new condition to be used
     * for future requests
     */
    protected abstract fun cache(input: T): C

    private val conditionals = mutableMapOf<T, C>()
    private val sources = mutableMapOf<T, Single<R>>()

    private val lock = Any()

    /**
     * Entry point to give an input a receive a [Single]
     * Note that the observer is not bound to any particular thread,
     * as it is dependent on [createNewSource]
     */
    operator fun invoke(input: T): Single<R> {
        synchronized(lock) {
            val source = sources[input]

            // update condition and retrieve old one
            val condition = conditionals.put(input, cache(input))

            // check to reuse observable
            if (source != null && condition != null && validate(input, condition))
                return source

            val newSource = createNewSource(input).cache()

            sources.put(input, newSource)
            return newSource
        }
    }

    /**
     * Open source creator
     * Result will then be created with [Single.cache]
     * If you don't have a need for cache,
     * you likely won't have a need for flyweights
     */
    open protected fun createNewSource(input: T): Single<R> =
            Single.fromCallable { call(input) }
                    .timeout(20, TimeUnit.SECONDS)
                    .subscribeOn(Schedulers.io())

    fun reset() {
        synchronized(lock) {
            sources.clear()
            conditionals.clear()
        }
    }

}

person Allan W    schedule 07.01.2018    source источник
comment
Мне также был бы интересен ответ на этот вопрос, я реализовал слой данных, который действует как кеш, он проверяет ConcurrentHashMap, был ли сделан конкретный запрос за последние x минут, если да, он возвращает Observable.just(cachedData) или возвращает запрос на дооснащение Observable.   -  person Tuby    schedule 08.01.2018
comment
Это интересно. Но другая проблема заключается в том, что у вас есть несколько запросов во время наблюдения за модификацией. В вашем случае, разве вы не создадите новый запрос для каждого из них? Или вы заставляете их подписываться на один наблюдаемый объект? Если Single действительно кэширует свой ответ, я думаю, что смогу сделать карту строки наблюдаемой, и она может работать   -  person Allan W    schedule 08.01.2018
comment
Я не подписываюсь на один и тот же Observable, другими словами, если я делаю X-запрос и до его завершения я делаю еще один X-запрос, данные загружаются избыточно, потому что я добавляю данные в кеш после того, как запрос был проанализирован. Вот почему меня интересует какое-то решение этой общей проблемы   -  person Tuby    schedule 08.01.2018
comment
Этот ответ содержит шаблон, который можно адаптировать для Single. Используйте Scheduler.Worker, чтобы запланировать удаление ключа по времени.   -  person akarnokd    schedule 08.01.2018
comment
@akarnokd Я сделал частичное решение выше. Я решил использовать функцию для проверки кеша, чтобы не было зависших таймеров и для большего контроля. Одного я не знаю, как отменить Single. Я вижу, что мне нужен SingleSubject, но тогда я не уверен, что cache() будет работать так, как ожидалось, поскольку мне придется каждый раз делать новый. Любые идеи?   -  person Allan W    schedule 08.01.2018
comment
@akarnokd Я реализовал RxCache, где значения очищаются через 60 секунд после их получения, он работает потрясающе, за исключением того, что кэширует ошибки, что является непреднамеренным поведением при выполнении сетевого запроса. Любой способ игнорировать ошибки кеширования, только кешировать успешный ответ?   -  person Tuby    schedule 08.01.2018
comment
Хорошо, я добавил doOnError(e -> cache.remove(key)), и он отлично работает. Спасибо!   -  person Tuby    schedule 08.01.2018