Управление побочными эффектами с помощью нескольких RxJava Observables

Я работаю со службой BLE GATT, которая имеет две характеристики, которые работают в тандеме. Одна характеристика предназначена только для записи, в которой вы можете отправить строковое значение в качестве запроса, а другая — характеристика только для уведомления, когда вы получаете ответ на запрос.

Служба уведомлений работает немного медленно, и важно не переходить к следующему запросу до того, как уведомление будет прочитано, иначе ответ будет потерян.

С этой целью я использовал RxAndroidBle Observables с отдельными каналами для характеристик записи и уведомления. Третий Observable предоставляет запросы. Однако записи шли слишком быстро.

ConnectableObservable notifyObservable =
    createNotifyObservable(NOTIFY_UUID).publish();

queryObserverable
    .doOnSubscribe(notifyObservable::connect)
    .doOnNext(query -> Log.d(TAG, "Processing query: " + query))
    .flatMap(query -> createWriteObservable(WRITE_UUID, query)))
    .doOnNext(request -> Log.d(TAG, "Write initiated."))
    .flatMap(request -> notifyObservable)
    .doOnNext(response -> Log.d(TAG, "Query response: " + response));

Итак, при запуске приложения вот что я вижу в журналах (включая метку времени, идентификатор процесса и идентификатор потока):

06-22 14:30:01.991 14085-15360 Processing query: Query1
06-22 14:30:02.011 14085-15360 Processing query: Query2
06-22 14:30:02.011 14085-15360 Processing query: Query3
06-22 14:30:07.261 14085-15443 Write initiated.
06-22 14:30:07.301 14085-15445 Write initiated
06-22 14:30:07.321 14085-15447 Query response: Response3
06-22 14:30:07.321 14085-15449 Write initiated.
06-22 14:30:07.321 14085-15447 Query response: Response3
06-22 14:30:07.351 14085-15453 Query response: Response3

Есть ли способ с RxJava гарантировать, что следующая запись произойдет только после получения ответа?

EDIT: при указании предлагаемого аргумента maxConnection для обоих вызовов flatMap вызовы происходят в правильном порядке, но только для первого запроса из Observable. Вот лог для этого случая:

06-22 14:39:09.841 22245-23079 Processing query: Query1
06-22 14:39:15.131 22245-23166 Write initiated.
06-22 14:39:15.201 22245-23169 Query response: Response1

person Tim Clemons    schedule 22.06.2016    source источник
comment
Можете ли вы указать идентификатор потока для каждой строки журнала?   -  person Dave Moten    schedule 22.06.2016
comment
Хорошо, я добавил больше деталей из журналов.   -  person Tim Clemons    schedule 23.06.2016


Ответы (2)


Попробуйте передать аргумент maxConcurrent = 1 вашим вызовам flatMap:

...
.flatMap(query -> createWriteObservable(WRITE_UUID, query), 1)
...
.flatMap(request -> createNotifyObservable(NOTIFY_UUID), 1)
...
person yurgis    schedule 22.06.2016
comment
Спасибо, я не знал об аргументе maxConcurrent. Это увенчалось частичным успехом, когда все вызывается в правильном порядке. Теперь эта проблема заключается в том, что оценивается только первый запрос из Observable. - person Tim Clemons; 23.06.2016
comment
Как сейчас выглядит ваш результат? - person yurgis; 23.06.2016
comment
Я добавил вывод журнала к вопросу. - person Tim Clemons; 23.06.2016
comment
Может быть, ваш notifyObservable испускает только один элемент? Попробуйте временно заменить его чем-то вроде Observable.just(1) внутри 2nd flatMap. Не застрянет ли он в этом случае? - person yurgis; 23.06.2016
comment
Да, в этом случае он все еще застревает. - person Tim Clemons; 23.06.2016
comment
В этом случае может возникнуть проблема обратного давления с исходным наблюдаемым: попробуйте заменить queryObservable на Observable.just(Query1,Query2,Query3) - person yurgis; 23.06.2016
comment
Тот же результат с заменой queryObservable. - person Tim Clemons; 23.06.2016
comment
Странно. Попробуйте изолировать проблему. Например, вот эквивалентный код, который работает для меня. Observable.just(Query1,Query2,Query3) .doOnNext(query -> System.out.println(Обработка запроса: + запрос)) .flatMap(query -> Observable.just(query).delay(100, TimeUnit.MILLISECONDS) , 1) .doOnNext(запрос -> System.out.println(Запись инициирована.)) .flatMap(запрос -> Observable.just(запрос).delay(200, TimeUnit.MILLISECONDS), 1) .doOnNext(ответ -> System.out.println(Ответ на запрос: + ответ)) .subscribe(); Thread.sleep(10000); - person yurgis; 23.06.2016

Если на каждый запрос есть ровно один ответ, вы можете упаковать свои запросы в функцию:

Observable<String> query(String query) {
  return notifyObservable.flatMap(notifyBytesObservable -> // to be sure that when we will start writing we're already listening
    Observable.combineLatest( // with combineLatest both Observables will be subscribed at the same time
      notifyBytesObservable.first(), // to unsubscribe from notification after the first one
      createWriteObservable(uuid, query), 
      { notificationBytes, writtenBytes -> notificationBytes }
    )
  )
    .map(notificationBytes -> String(notificationBytes))
}

И теперь вы должны иметь возможность совершать такие вызовы:

queryObserverable
  .flatMap(queryString -> query(queryString), 1)

Я надеюсь, это поможет вам.

Наилучшие пожелания

person Dariusz Seweryn    schedule 08.07.2016