У меня есть следующий код, который вставляет коллекцию pojos (для преобразования в документы json) в couchbase с помощью Observables:
public <T> long batchUpsert(Iterable<T> items, Function<T, JsonDocument> docCreator, Bucket couchbaseBucket) {
AtomicLong counter = new AtomicLong();
AsyncBucket asyncBucket = couchbaseBucket.async();
Observable<JsonDocument> observableFromDocs =
Observable
.from(items)
.map(elem -> docCreator.apply(elem))
.filter(elem -> elem!=null)//skip creating problematic docs. logging their info for troubleshooting
.flatMap(elem -> upsertDocument(elem, asyncBucket))
.retryWhen(
RetryBuilder.anyOf(BackpressureException.class, Exception.class)
.doOnRetry((Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) ->
log.error("Retrying load. Attempt {} For exception {}", integer,throwable.toString())
)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, RETRY_DELAY_CEILING))
.max(MAX_RETRIES)
.build()
);
observableFromDocs.subscribe(
(elem)-> {},
elem -> log.error("Document insertion failure", elem),
() -> {counter.incrementAndGet();log.debug("Completed ASYNC load ");});
return counter.get();
}
Этот код работает нормально, документы создаются, и выполняется асинхронный вызов для upsert, но ни один документ не загружается в couchbase, он терпит неудачу, и никакие исключения не регистрируются, почти как если бы поток умирает внутри, может ли кто-нибудь любезно указать, что я делаешь неправильно? Я собираюсь выдернуть волосы ... :)
Теперь я подтвердил, что он не работает для коллекций из 1 элемента, может кто-нибудь сказать мне, почему это происходит?