Блокировка операции записи RxAndroidBle

Как я могу выполнить операцию блокировки записи в Android с помощью RxAndroidBle. Только если операция записи прошла успешно, следует выполнить следующую команду.

protected void doWriteBytes(UUID characteristic, byte[] bytes) {
    final Disposable disposable = connectionObservable
            .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
            .observeOn(AndroidSchedulers.mainThread())
            .retry(BT_RETRY_TIMES_ON_ERROR)
            .subscribe(
                    value -> {
                        Timber.d("Write characteristic %s: %s",
                                BluetoothGattUuid.prettyPrint(characteristic),
                                byteInHex(value));
                    },
                    throwable -> onError(throwable)
            );

    compositeDisposable.add(disposable);
}


protected void test() {
  // blocking write bytes
  doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12});

  // following command should be only performed if doWriteBytes is successful executed
  foo();

  // blocking write bytes
  doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1});

  bar();
}

Я знаю subscribe и onComplete, но можно ли обойтись и без этих методов?

Фон состоит в том, что я хочу переопределить метод тестирования в нескольких разных подклассах, поэтому я могу выполнять различные команды doWriteBytes (например, команды ACK) для отправки некоторых байтов на устройство Bluetooth, но мне нужно быть уверенным, что следующая команда выполняется только в том случае, если команда ACK отправлена ​​успешно.

Возможно, это скорее проблема RxJava2, но я не совсем знаком с ней.

Изменить:

Спасибо за ответ @Dariusz Seweryn. Извините, мой вопрос, вероятно, был не совсем ясным. Постараюсь конкретизировать.

Я хочу написать исходный код как обычную функцию в test (), чтобы абстрагироваться от реализаций RxJava2. Единственное отличие состоит в том, что doWriteBytes и другие операции Bluetooth (уведомление, чтение) должны выполняться через RxAndroidBle. То, что я должен записать на устройство Bluetooth, зависит от байтов уведомлений или какого-либо другого алгоритма в методе test (). Кроме того, я хочу перезаписать метод test (), чтобы реализовать другой поток связи Bluetooth для совершенно другого устройства Bluetooth. Всегда важно, чтобы операции Bluetooth выполнялись последовательно.

Теперь у меня есть три идеи:

1) Моя первая идея - реализовать блокировку всех операций RxAndroidBle, поэтому я могу использовать простые, например, петли.

2) Моя вторая идея - динамически добавлять (concat?) Во время выполнения наблюдения к другому в методе test (), который последовательно обрабатывает, но мне всегда нужны возвращаемые значения из предыдущих наблюдений?

3) Моя третья идея - объединить операцию записи / уведомления / записи в качестве метода, который я могу вызвать в методе test (). Операция должна записывать байты в характеристику A, затем ждать уведомления по характеристике B, выполнять некоторую обработку полученных байтов и снова записывать в характеристику C. Но что записано или как процесс уведомления должен быть динамически во время выполнения в тесте () добавлен метод.

Может, есть изящное решение моей проблемы в RxJava2 или это вообще невозможно?

Edit2:

Я попытался реализовать все три идеи, но, к сожалению, мне это не удалось.

1)

connectionObservable
                .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .retry(BT_RETRY_TIMES_ON_ERROR)
                .blockingSubscribe(
                        value -> {
                            Timber.d("Write characteristic %s: %s",
                                    BluetoothGattUuid.prettyPrint(characteristic),
                                    byteInHex(value));
                            processBtQueue();
                        },
                        throwable -> onError(throwable)
                );

Всегда блокирует даже при успехе? Надо ли его где-то выпустить? Кроме того, метод возвращает void, а не одноразовый, но тогда я не могу его избавиться.

2) Я борюсь с этой идеей. С какой наблюдаемой мне следует конкатенировать, если я не знаю начальную наблюдаемую? ConnectionObserable не работает, потому что он содержит RxBleConnection. Вторая проблема заключается в том, что значения после операции Bluetooth являются классами объектов Java !? Надо ли бросать каждый раз? У вас есть пример, как я могу объединить, например, операцию записи Bluetooth в результат уведомления Bluetooth?

3) Проблема с этой идеей в том, что я не знаю, как динамически добавлять во время выполнения часть обработки к уведомлению вне части подписки RxJava?

У меня есть рабочее решение для идеи №3

protected Observable<byte[]> doWriteNotify(UUID characteristic, byte[] bytes, Observable<byte[]> notificationObservable) {
    Observable observable = connectionObservable
            .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retry(BT_RETRY_TIMES_ON_ERROR)
        .flatMap( writeBytes -> notificationObservable)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retry(BT_RETRY_TIMES_ON_ERROR);

    compositeDisposable.add(observable.subscribe());

    return observable;
}

Кстати. я должен создавать отдельные потоки в stackoverflow с этими вопросами?

Если это поможет, вы можете найти мой экспериментальный исходный код здесь.


person olie.xdev    schedule 18.02.2019    source источник
comment
Имейте в виду, что редактирование вашего сообщения не уведомляет людей, которые уже ответили   -  person Dariusz Seweryn    schedule 20.02.2019
comment
Спасибо за подсказку. Я снова отредактировал свой вопрос.   -  person olie.xdev    schedule 20.02.2019


Ответы (1)


Я знаю subscribe и onComplete, но можно ли обойтись и без этих методов?

Данный:

Completable foo() { ... }

Completable bar() { ... )

Можно было сделать:

Disposable testDisposable = connectionObservable
                                .flatMapCompletable(connection ->
                                    connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12}).ignoreElement()
                                        .andThen(foo())
                                        .andThen(connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1}).ignoreElement())
                                        .andThen(bar())
                                )
                                .subscribe(
                                     () -> { /* completed */ },
                                     throwable -> { /* error happened */ }
                                )

Завершив вышеуказанное с помощью одного .subscribe(), можно контролировать, сколько соединений будет завершено. В приведенном выше примере, если соединение преждевременно завершится во время первой записи, все последующие операции (foo(), запись, bar()) вообще не произойдут.

Изменить:

Все ваши идеи потенциально могут сработать - вы можете попробовать их.

Может, есть изящное решение моей проблемы в RxJava2 или это вообще невозможно?

Есть .blocking*() функций для _7 _ / _ 8 _ / _ 9_ классов, если они вам действительно нужны. Однако будьте осторожны - вы можете начать испытывать некоторые трудности с отладкой в ​​зависимости от вашей реализации из-за введения большего количества состояний в ваше приложение.

person Dariusz Seweryn    schedule 18.02.2019
comment
Спасибо за вашу помощь. Я решил свою проблему с помощью третьей идеи и добавления subscribeOn(Schedulers.io()), но мне все равно было бы интересно, как решить blockingSubscribe()? - person olie.xdev; 20.02.2019
comment
Не .blockingSubscribe(), а .blockingFirst(), например. Он вернет первую эмиссию блокирующим образом. Однако я не уверен, как каждая из этих функций справляется с утилизацией. - person Dariusz Seweryn; 21.02.2019
comment
хммм .... не совсем так, как вы ожидали. У меня проблема с врезными блокировками с блокировкой First. В любом случае спасибо за вашу отличную библиотеку RxAndroidBle с открытым исходным кодом. - person olie.xdev; 21.02.2019