Почему flatMap реализован с объединением в RxJava?

Почему в RxJava 1.x оператор flatMap () реализован с помощью слияния?

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }

Из вызова flatMap () я могу вернуть только один Observable, соответствующий <? extends Observable<? extends R>>. Затем вызов map (func) переносит его в другой Observable, так что у нас есть что-то вроде этого Observable<? extends Observable<? extends R>>. Это заставляет меня думать, что вызов merge () после map (func) не нужен.

Говорят, что оператор merge () выполняет следующее:

Сглаживает Observable, который испускает Observable, в один Observable, который испускает элементы, испускаемые этими Observable, без какого-либо преобразования.

Теперь внутри плоской карты у нас может быть только Observable, который испускает один Observable. Зачем сливаться? Что мне здесь не хватает?

Спасибо.


person southerton    schedule 06.01.2017    source источник


Ответы (2)


Просмотр подписей может помочь:

Представьте, что с Observable<String> вы хотите flatMap к отдельным символам. Это можно сделать с помощью flatMap:

Observable.just("foo", "hello")
          .flatMap(s -> Observable.from(s.split("")))

Что за тип этого наблюдаемого? Это Observable<String>.

Теперь вместо flatMap используйте map с той же функцией. Что за тип?

Observable.just("foo", "hello")
          .map(s -> Observable.from(s.split("")))

Вы увидите, что это на самом деле _8 _... И если мы подпишемся на этот наблюдаемый и распечатаем сгенерированные элементы, мы получим следующее:

rx.Observable@5b275dab
rx.Observable@61832929

Не очень полезно. Хуже того, на эти Observable не подписаны, поэтому они не будут передавать никаких данных :(

Мы видим, что цель flatMap - позволить функции создать внутренний Observable<T> для каждого элемента источника, а затем подписаться на эти внутренние наблюдаемые и сгладить их выбросы вместе в выходных Observable<T>. И merge именно это и делает!

Чтобы убедиться в этом, оберните приведенный выше результат карты в Observable.merge(...):

Observable<Observable<String>> mapped = 
    Observable.just("foo", "hello")
              .map(s -> Observable.from(s.split("")));

Observable.merge(mapped)
          .subscribe(System.out::println);

Это выводит:

f
o
o
h
e
l
l
o
person Simon Baslé    schedule 06.01.2017

Теперь внутри плоской карты у нас может быть только Observable, который испускает один Observable.

Нет - у вас есть один наблюдаемый объект, который испускает один наблюдаемый на каждый наблюдаемый элемент источника. Итак, если ваш исходный наблюдаемый имеет больше элементов, у вас будет несколько испускаемых наблюдаемых.

Вот почему вам нужен merge().

person Tassos Bassoukos    schedule 06.01.2017