Добавление документов с помощью async bucket на couchbase java sdk 2.7.15

У меня есть следующий код, который вставляет коллекцию pojos (для преобразования в документы json) в couchbase с помощью Observables:

    public <T> long  batchUpsert(Iterable<T> items, Function<T, JsonDocument> docCreator, Bucket couchbaseBucket) {

    AtomicLong counter = new AtomicLong();

    AsyncBucket asyncBucket = couchbaseBucket.async();
    Observable<JsonDocument> observableFromDocs =
            Observable
                    .from(items)
                    .map(elem -> docCreator.apply(elem))
                    .filter(elem -> elem!=null)//skip creating problematic docs. logging their info for troubleshooting
                    .flatMap(elem -> upsertDocument(elem, asyncBucket))
                    .retryWhen(
                            RetryBuilder.anyOf(BackpressureException.class, Exception.class)
                                    .doOnRetry((Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) ->
                                            log.error("Retrying load. Attempt {} For exception {}", integer,throwable.toString())
                                    )
                                    .delay(Delay.exponential(TimeUnit.MILLISECONDS, RETRY_DELAY_CEILING))
                                    .max(MAX_RETRIES)
                                    .build()
                    );

    observableFromDocs.subscribe(
            (elem)-> {},
            elem -> log.error("Document insertion failure", elem),
            () -> {counter.incrementAndGet();log.debug("Completed ASYNC load ");});

    return counter.get();
}

Этот код работает нормально, документы создаются, и выполняется асинхронный вызов для upsert, но ни один документ не загружается в couchbase, он терпит неудачу, и никакие исключения не регистрируются, почти как если бы поток умирает внутри, может ли кто-нибудь любезно указать, что я делаешь неправильно? Я собираюсь выдернуть волосы ... :)

Теперь я подтвердил, что он не работает для коллекций из 1 элемента, может кто-нибудь сказать мне, почему это происходит?


person Carlos Luis    schedule 18.11.2020    source источник


Ответы (1)


Я думаю, что ваш код не может дождаться завершения всех асинхронных операций, поэтому эта ситуация вызывает проблему. Официальный sdk dock содержит пример пакетных операций, и в этом примере говорится, что ожидание последней операции и блок вызова

person kakashi hatake    schedule 25.03.2021