Как я могу выполнить операцию блокировки записи в Android с помощью RxAndroidBle. Только если операция записи прошла успешно, следует выполнить следующую команду.
protected void doWriteBytes(UUID characteristic, byte[] bytes) {
final Disposable disposable = connectionObservable
.flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR)
.subscribe(
value -> {
Timber.d("Write characteristic %s: %s",
BluetoothGattUuid.prettyPrint(characteristic),
byteInHex(value));
},
throwable -> onError(throwable)
);
compositeDisposable.add(disposable);
}
protected void test() {
// blocking write bytes
doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12});
// following command should be only performed if doWriteBytes is successful executed
foo();
// blocking write bytes
doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1});
bar();
}
Я знаю subscribe и onComplete, но можно ли обойтись и без этих методов?
Фон состоит в том, что я хочу переопределить метод тестирования в нескольких разных подклассах, поэтому я могу выполнять различные команды doWriteBytes (например, команды ACK) для отправки некоторых байтов на устройство Bluetooth, но мне нужно быть уверенным, что следующая команда выполняется только в том случае, если команда ACK отправлена успешно.
Возможно, это скорее проблема RxJava2, но я не совсем знаком с ней.
Изменить:
Спасибо за ответ @Dariusz Seweryn. Извините, мой вопрос, вероятно, был не совсем ясным. Постараюсь конкретизировать.
Я хочу написать исходный код как обычную функцию в test (), чтобы абстрагироваться от реализаций RxJava2. Единственное отличие состоит в том, что doWriteBytes и другие операции Bluetooth (уведомление, чтение) должны выполняться через RxAndroidBle. То, что я должен записать на устройство Bluetooth, зависит от байтов уведомлений или какого-либо другого алгоритма в методе test (). Кроме того, я хочу перезаписать метод test (), чтобы реализовать другой поток связи Bluetooth для совершенно другого устройства Bluetooth. Всегда важно, чтобы операции Bluetooth выполнялись последовательно.
Теперь у меня есть три идеи:
1) Моя первая идея - реализовать блокировку всех операций RxAndroidBle, поэтому я могу использовать простые, например, петли.
2) Моя вторая идея - динамически добавлять (concat?) Во время выполнения наблюдения к другому в методе test (), который последовательно обрабатывает, но мне всегда нужны возвращаемые значения из предыдущих наблюдений?
3) Моя третья идея - объединить операцию записи / уведомления / записи в качестве метода, который я могу вызвать в методе test (). Операция должна записывать байты в характеристику A, затем ждать уведомления по характеристике B, выполнять некоторую обработку полученных байтов и снова записывать в характеристику C. Но что записано или как процесс уведомления должен быть динамически во время выполнения в тесте () добавлен метод.
Может, есть изящное решение моей проблемы в RxJava2 или это вообще невозможно?
Edit2:
Я попытался реализовать все три идеи, но, к сожалению, мне это не удалось.
1)
connectionObservable
.flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR)
.blockingSubscribe(
value -> {
Timber.d("Write characteristic %s: %s",
BluetoothGattUuid.prettyPrint(characteristic),
byteInHex(value));
processBtQueue();
},
throwable -> onError(throwable)
);
Всегда блокирует даже при успехе? Надо ли его где-то выпустить? Кроме того, метод возвращает void, а не одноразовый, но тогда я не могу его избавиться.
2) Я борюсь с этой идеей. С какой наблюдаемой мне следует конкатенировать, если я не знаю начальную наблюдаемую? ConnectionObserable не работает, потому что он содержит RxBleConnection. Вторая проблема заключается в том, что значения после операции Bluetooth являются классами объектов Java !? Надо ли бросать каждый раз? У вас есть пример, как я могу объединить, например, операцию записи Bluetooth в результат уведомления Bluetooth?
3) Проблема с этой идеей в том, что я не знаю, как динамически добавлять во время выполнения часть обработки к уведомлению вне части подписки RxJava?
У меня есть рабочее решение для идеи №3
protected Observable<byte[]> doWriteNotify(UUID characteristic, byte[] bytes, Observable<byte[]> notificationObservable) {
Observable observable = connectionObservable
.flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR)
.flatMap( writeBytes -> notificationObservable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR);
compositeDisposable.add(observable.subscribe());
return observable;
}
Кстати. я должен создавать отдельные потоки в stackoverflow с этими вопросами?
Если это поможет, вы можете найти мой экспериментальный исходный код здесь.