Контекст использует Couchbase для реализации службы REST CRUD в двухуровневом хранилище документов. Модель данных - это индексный документ, указывающий на ноль или более документов элементов. Индексный документ извлекается как Observable с помощью асинхронного получения. За ним следует .flatMap (), который извлекает ноль или более идентификаторов для каждого документа элемента. Async get возвращает Observable, поэтому теперь Observable, который я создаю, является Observable>. Я хочу связать оператор .merge (), который будет принимать «Observable, который испускает Observables и объединяет их вывод с выводом одного Observable», чтобы процитировать документацию ReactiveX :) Затем я буду .subscribe () на этот единственный Наблюдаемый для получения документов элемента. Оператор .merge () имеет много сигнатур, но я не могу понять, как использовать его в цепочке операторов следующим образом:
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return items;
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.merge( ???????? )
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
РЕДАКТИРОВАТЬ:
Вы, наверное, догадались, что я реактивный новичок. Ответ от @akarnokd помог мне понять, что я пытаюсь сделать глупо. Решение состоит в том, чтобы объединить выбросы от элементов Observable<Observable<JsonDocument>>
внутри document
закрытия и вернуть результат этого. Это испускает результирующий JsonDocuments
из flatMap
:
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return Observable.merge(items);
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
Проверено и работает :)
Observable<Observable<JsonDocument>>
, но хотите получитьObservable<JsonDocument>
? - person TpoM6oH   schedule 21.10.2015