Наблюдаемый: переключаться, если испущено менее X элементов

У меня есть список наблюдаемых obs1, obs2, obs3,...,

Каждый из них может выдавать несколько элементов (из базы данных mongodb), меня интересуют только первые N элементов. Я хочу убедиться, что запросы моих наблюдаемых выполняются только в случае необходимости. Другими словами, если obs1, например, производит больше, чем N, запрос за obs2 не должен выполняться и т. д.

Если я использую concat: Observable(obs1, obs2, obs3, ...).concat, все запросы могут выполняться параллельно в mongodb

По сути, я ищу операцию вроде obs1.switchIfX(obs2).switchIfX(obs3).....

Где X: текущее наблюдаемое испускает менее N элементов.

Любая идея, как я могу реализовать это требование в стиле rxscala?


person proximator    schedule 12.12.2017    source источник
comment
Если N равно 10 и obs1 произвело 5, следует ли сделать эти 5 доступными для нисходящего потока или их следует игнорировать и obs2 подписаться?   -  person akarnokd    schedule 13.12.2017


Ответы (2)


Вы можете попробовать что-то вроде этого (непроверенный):

Observable.just(obs1, obs2, obs3).flatten(maxConcurrent=1).take(N)

Аргумент maxConcurrent гарантирует, что flatten подписывается только на один наблюдаемый за раз, и как только N элементов будут выпущены, take отпишется от вышестоящего наблюдаемого, поэтому, если в этот момент obs2 или obs3 еще не подписаны, они никогда не будут подписаны. запускаться по желанию.

person Samuel Gruetter    schedule 13.12.2017

Вы можете собрать элементы из источника в список, проверить его размер в операторе flatMap и переключиться на другой источник, если длины недостаточно:

@Test
public void test() {
    Observable.range(1, 5)
    .compose(switchIfFewer(Observable.range(1, 8), 10))
    .compose(switchIfFewer(Observable.range(1, 15), 10))
    .test()
    .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
}

static <T> ObservableTransformer<T, T> switchIfFewer(Observable<T> other, int n) {
    return o -> {
        return o.toList()
        .flatMapObservable(list -> {
            if (list.size() < n) {
                return other;
            }
            return Observable.fromIterable(list);
        });
    };
}

Если вы не хотите получать больше N элементов, вы можете вместо этого указать o.take(n).toList().

person akarnokd    schedule 13.12.2017
comment
Большое спасибо, это очень полезно, но я хочу, чтобы первая N из obs1, obs2, obs3 в этом порядке. Это означает, что в вашем примере я ожидаю 1,2,3,4,5,1,2,3,4,5, в основном только obs1 и obs2 излучают, а obs3 нет. - person proximator; 13.12.2017
comment
Я не понимаю. Вам нужны первые N элементов из каждого источника? Это как just(obs1, obs2, obs3).concatMap(o -> o.take(N)). - person akarnokd; 13.12.2017
comment
Нет, всего N элементов. Предположим, N=5, obs1 выдает 2, obs2 выдает 4, тогда я возьму 2 элемента из obs1, 3 из obs2 и проигнорирую obs3, так как я уже собрал 5. - person proximator; 13.12.2017