Как объединить вывод Observable, который испускает Observable, не разрывая цепочку операторов?

Контекст использует Couchbase для реализации службы REST CRUD в двухуровневом хранилище документов. Модель данных - это индексный документ, указывающий на ноль или более документов элементов. Индексный документ извлекается как Observable с помощью асинхронного получения. За ним следует .flatMap (), который извлекает ноль или более идентификаторов для каждого документа элемента. Async get возвращает Observable, поэтому теперь Observable, который я создаю, является Observable>. Я хочу связать оператор .merge (), который будет принимать «Observable, который испускает Observables и объединяет их вывод с выводом одного Observable», чтобы процитировать документацию ReactiveX :) Затем я буду .subscribe () на этот единственный Наблюдаемый для получения документов элемента. Оператор .merge () имеет много сигнатур, но я не могу понять, как использовать его в цепочке операторов следующим образом:

bucket
.async()
.get(id)
.flatMap(
    document -> {

        JsonArray itemArray = (JsonArray) document.content().get("item");
        //  create Observable that gets and emits zero or more 
        // Observable<Observable<JsonDocument>> ie. bucket.async().gets
        Observable<Observable<JsonDocument>> items =
            Observable.create(observer -> {
                try {
                    if (!observer.isUnsubscribed()) {
                itemArray.forEach(
                    (jsonObject) -> {
                        String itemId = ((JsonObject)jsonObject).get("itemid").toString();
                        observer.onNext( 
                            bucket.async().get(itemId)
                        );
                    }
                    }
                );
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        );          
        return items;
    },
    throwable -> {
        //  error handling omitted...
        return Observable.empty();
    },
    () -> {
        //  on complete processing omitted...
        return null;
    }
)
.merge( ???????? )
.subscribe( 
    nextItem -> {
        //  do something with each item document...
    },
    throwable -> {
        //  error handling omitted...
    },
    () -> {
         //  do something else...
    }
);

РЕДАКТИРОВАТЬ:

Вы, наверное, догадались, что я реактивный новичок. Ответ от @akarnokd помог мне понять, что я пытаюсь сделать глупо. Решение состоит в том, чтобы объединить выбросы от элементов Observable<Observable<JsonDocument>> внутри document закрытия и вернуть результат этого. Это испускает результирующий JsonDocuments из flatMap:

bucket
.async()
.get(id)
.flatMap(
    document -> {

        JsonArray itemArray = (JsonArray) document.content().get("item");
        //  create Observable that gets and emits zero or more 
        // Observable<Observable<JsonDocument>> ie. bucket.async().gets
        Observable<Observable<JsonDocument>> items =
            Observable.create(observer -> {
                try {
                    if (!observer.isUnsubscribed()) {
                itemArray.forEach(
                    (jsonObject) -> {
                        String itemId = ((JsonObject)jsonObject).get("itemid").toString();
                        observer.onNext( 
                            bucket.async().get(itemId)
                        );
                    }
                    }
                );
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        );          
        return Observable.merge(items);
    },
    throwable -> {
        //  error handling omitted...
        return Observable.empty();
    },
    () -> {
        //  on complete processing omitted...
        return null;
    }
)
.subscribe( 
    nextItem -> {
        //  do something with each item document...
    },
    throwable -> {
        //  error handling omitted...
    },
    () -> {
         //  do something else...
    }
);

Проверено и работает :)


person pink spikyhairman    schedule 21.10.2015    source источник
comment
Итак, в настоящее время вы получаете Observable<Observable<JsonDocument>>, но хотите получить Observable<JsonDocument>?   -  person TpoM6oH    schedule 21.10.2015
comment
Да, это правильно. Если бы я начал цепочку слияния, я мог бы дать ей Observable ‹Observable ‹T››. Как отмечает stackoverflow.com/users/61158/akarnokd в своем ответе, у rxJava нет способа сделать это в цепочка операторов.   -  person pink spikyhairman    schedule 21.10.2015


Ответы (2)


Из-за выразительных ограничений Java у нас не может быть безпараметрического оператора merge(), который можно применить к Observble<Observable<T>>. Для этого потребуются методы расширения, такие как C #.

Следующее, что лучше всего сделать - это сделать идентификацию flatMap:

// ...
.flatMap(document -> ...)
.flatMap(v -> v)
.subscribe(...)
person akarnokd    schedule 21.10.2015
comment
Если я использую .flatMap(), мне нужно будет использовать .toBlocking().first() или эквивалент в onNext, чтобы получить JsonDocument из Observable? Моя реализация в настоящее время использует bucket.async().get(itemId).toBlocking().first() в элементах Observable. Я пытался сделать это неблокирующим. А может я неправильно понял ваше предложение? - person pink spikyhairman; 21.10.2015
comment
Вы спросили, как сделать плавное слияние (). Это не имеет никакого отношения к тому, являются ли поток или значение в идентичности flatMap асинхронными или нет. Как вы хотите использовать сглаженные значения? - person akarnokd; 21.10.2015
comment
Ваш ответ правильный. Мое требование асинхронности - это не то, что я изначально просил, поэтому я отмечаю правильный ответ. Теперь у меня есть решение для асинхронной части, поэтому я отредактирую свой вопрос, чтобы показать решение этого ... - person pink spikyhairman; 22.10.2015

Вы можете вызвать toList(), чтобы собрать все отправленные элементы в один список. Не тестировал, но как насчет такого:

bucket.async()
  .get(id)
  .flatmap(document -> { return Observable.from((JsonArray)document.content().get("item")})
  .flatMap(bucket::get)
  .toList()
  .subscribe(results -> /* list of documents */);
person Julian Go    schedule 21.10.2015
comment
Спасибо за ответ. Это может сработать, но теперь я понимаю, как работает merge(), и отредактирую свой вопрос с помощью решения, которое использую - person pink spikyhairman; 22.10.2015