Как выполнить операцию на основе нескольких результатов RxJava Completable

Некоторое время я ломал голову над этим, я потерялся здесь в управлении требованием, когда я должен использовать Rx в Котлине.

Позволь мне объяснить.

Существует набор идентификаторов, эквивалентные элементы которых должны быть удалены с сервера и, в конечном итоге, локально в зависимости от успеха сервера.

В принципе

  1. Сделать сетевой вызов для удаления одного id (поддерживаемый сетевой вызов возвращает Completable)
  2. если получен обратный вызов complete(успех), сохраните id в list (памяти)
  3. Выполните первый и второй шаги для всех id, которые необходимо удалить.
  4. После завершения каждого сетевого вызова передайте список для удаления из локальной базы данных.

Таким образом, доступны эти функции, которые нельзя изменить.

  1. fun deleteId(id: String): Completable { networkCall.deleteId(id) }
  2. fun deleteIds(ids: List<String>): Unit { localDb.deleteId(ids) }

Это то, что я пробовал, но, очевидно, неполное и застрявшее...

val deleted = CopyOnWriteArrayList<String>()
val error = CopyOnWriteArrayList<String>()
items?.filter { it.isChecked }
    ?.map { Pair(it.id, dataManager.deleteId(it.id)) }
    ?.forEach { (Id, deleteOp) ->
        deleteOp.subscribeOn(Schedulers.io())
                .subscribe(object: CompletableObserver {
                    override fun onComplete() { deleted.add(Id) }

                    override fun onSubscribe(d: Disposable) { disposableManager += d }

                    override fun onError(e: Throwable) { error.add(Id) }

                })
    }

Итак, теперь здесь есть несколько проблем. Одна из них - это требование, когда я не могу найти место, где можно узнать, что все запросы выполнены, чтобы инициировать удаление localDb.

Есть ли способ, где я могу использовать Flowable.fromIterable() или zip или merge каким-то образом, следуя цепочке команд, как указано выше, для достижения вышеуказанного сценария?


person Varun Muralidharan    schedule 20.08.2020    source источник


Ответы (1)


Если я правильно понял ваш вариант использования, то это должно делать:

// ids of items to delete, for illustration lets have some temp set
val ids = setOf<String>("1", "2", "3", "4")
val deleteIdSingles = mutableListOf<Single<String>>()
ids.forEach { id ->
    deleteIdSingles.add(
        api.deleteId(id)
            // when request successfully completes, return its id wrapped in a Single, instead of Completable
            .toSingle<String> { id }
            // return a flag when this request fails, so that the stream is not closed and other requests would still be executed
            .onErrorReturn { "FAILED" }
    )
}

Single.merge(deleteIdSingles)
    // collect the results of the singles (i.e. the ids of successful deletes), and emit a set of those ids once all the singles has completed
    .collect(
        { mutableListOf() },
        { deletedIds: MutableList<String>, id: String -> if (id != "FAILED") deletedIds.add(id) }
    )
    .observeOn(Schedulers.io())
    .subscribe(
        { deletedIds ->
                db.deleteIds(deletedIds)
        }, { error ->
            // todo: onError
        })
person Dat Pham Tat    schedule 20.08.2020