У меня есть следующий дизайн, который я хотел бы создать, но я не уверен, какой шаблон Rx ему соответствует. Цель более-менее похожа на сингл, но с условной проверкой.
- Есть один
Observable<String>
, и возможно любое количество наблюдателей. - Если запрос сделан впервые, наблюдаемый выполнит некоторый сетевой запрос, приняв строку, а затем выдаст обратный вызов (во многом похожий на завершаемый/одиночный)
- Любой последующий вызов с тем же ключом немедленно вернет тот же результат.
- Однако, если прошло 5 минут и был сделан тот же вызов, мы повторно получим данные, так как срок их действия истек, а затем отправим их любым слушателям. Этот результат сохраняется еще 5 минут, и цикл повторяется.
- Все данные сохраняются на основе отправленного ключа, как в шаблоне легковеса. Срок действия зависит от времени последнего запроса конкретного ключа.
Моя первоначальная мысль состояла в том, чтобы просто создать свой собственный класс с параллельными хэш-картами. Однако это будет означать, что мне придется самому обрабатывать множество механизмов потоковой передачи. Я чувствую, что RxJava будет отличным решением для этого, но я не уверен, что такие шаблоны существуют. У кого-нибудь есть идея?
Я понимаю, что целью Single<T>
является получение только одного ответа, поэтому мои условия могут быть неверными.
Ниже приведена моя попытка, которую я буду обновлять по ходу дела.
/**
* Created by Allan Wang on 07/01/18.
*
* Reactive flyweight to help deal with prolonged executions
* Each call will output a [Single], which may be new if none exist or the old one is invalidated,
* or reused if an old one is still valid
*
* Types:
* T input argument for caller
* C condition condition to check against for validity
* R response response within reactive output
*/
abstract class RxFlyweight<in T : Any, C : Any, R : Any> {
/**
* Given an input emit the desired response
* This will be executed in a separate thread
*/
protected abstract fun call(input: T): R
/**
* Given an input and condition, check if
* we may used cache data or if we need to make a new request
* Return [true] to use cache, [false] otherwise
*/
protected abstract fun validate(input: T, cond: C): Boolean
/**
* Given an input, create a new condition to be used
* for future requests
*/
protected abstract fun cache(input: T): C
private val conditionals = mutableMapOf<T, C>()
private val sources = mutableMapOf<T, Single<R>>()
private val lock = Any()
/**
* Entry point to give an input a receive a [Single]
* Note that the observer is not bound to any particular thread,
* as it is dependent on [createNewSource]
*/
operator fun invoke(input: T): Single<R> {
synchronized(lock) {
val source = sources[input]
// update condition and retrieve old one
val condition = conditionals.put(input, cache(input))
// check to reuse observable
if (source != null && condition != null && validate(input, condition))
return source
val newSource = createNewSource(input).cache()
sources.put(input, newSource)
return newSource
}
}
/**
* Open source creator
* Result will then be created with [Single.cache]
* If you don't have a need for cache,
* you likely won't have a need for flyweights
*/
open protected fun createNewSource(input: T): Single<R> =
Single.fromCallable { call(input) }
.timeout(20, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
fun reset() {
synchronized(lock) {
sources.clear()
conditionals.clear()
}
}
}
Observable.just(cachedData)
или возвращает запрос на дооснащениеObservable
. - person Tuby   schedule 08.01.2018Single
. ИспользуйтеScheduler.Worker
, чтобы запланировать удаление ключа по времени. - person akarnokd   schedule 08.01.2018Single
. Я вижу, что мне нуженSingleSubject
, но тогда я не уверен, чтоcache()
будет работать так, как ожидалось, поскольку мне придется каждый раз делать новый. Любые идеи? - person Allan W   schedule 08.01.2018doOnError(e -> cache.remove(key))
, и он отлично работает. Спасибо! - person Tuby   schedule 08.01.2018