RxJS повторить всю цепочку

Я читаю изображения из живого потока и периодически выбираю пакет. Затем я отправляю их на сервер для проверки. Ошибка HTTP будет выдана, если какая-либо проверка не пройдена. Если это произойдет, я хочу получить новую партию изображений.

this.input.getImages()
  .throttleTime(500)
  .switchMap(image =>

      new Observable<{}>(observer => {
        // Some operation
      })
      .map(i => ({ image, i }))

  ).filter(({ i }) =>  {
    // some filtering
  })
  .map(({ image }) => image)
  .take(6)
  .bufferCount(6)
  .map(images =>  // switch map??
    Observable.fromPromise(this.server.validate(images))
  )
  .retry(2)  // This only retrys the request, I want it to retry the whole chain (to get valid images)
  .subscribe(images => {
      console.log('All done')
    },
    err => {console.log(err)}
  )

Проблема, с которой я сталкиваюсь, заключается в том, что повторяется только HTTP-запрос, поскольку это новый наблюдаемый объект. Должен ли быть какой-то способ инкапсулировать начало цепочки в один Observable?


person ovg    schedule 31.03.2018    source источник


Ответы (2)


См. раздел learnrxjs — повторить попытку. В примере показано, как все перезапускается из исходного кода при возникновении ошибки.

На странице показан синтаксис pipe, но JSBin показывает синтаксис гибкого оператора, если вы предпочитаете.

Основной шаблон

const retryMe = this.input.getImages()
  .flatMap(val => {
    Observable.of(val)
      // more operators
  })
  .retry(2);
person Richard Matsen    schedule 31.03.2018
comment
Можно ли получить новые значения из this.input.getImages() при повторной попытке? - person ovg; 01.04.2018
comment
Глядя на страницу документа, можно увидеть, что источник, т.е. интервал (1000), перезапускается с 0 каждый раз, когда происходит повторная попытка, поэтому да, я ожидаю, что this.input.getImages() будет снова запущен при повторной попытке. Проще всего попробовать и увидеть! Конечно, это зависит от характера getImages() - если это «горячий» наблюдаемый объект (например, веб-камера), вы получите новые изображения при повторной попытке. - person Richard Matsen; 01.04.2018
comment
ах, хорошо, теперь понял, мне нужно было взять ThrotTime, Take и BufferCount из внутреннего Observable (дух). Я столкнулся с каким-то странным поведением. Спасибо! - person ovg; 01.04.2018
comment
На самом деле, все, что было не так с моим кодом, это то, что я использовал switchMap вместо flatMap. В моем случае нет необходимости в Observable.of(val). - person ovg; 01.04.2018
comment
Что ж, основное различие между switchMap() и flatMap() заключается в том, что первый отменяет предыдущую внутреннюю наблюдаемую при поступлении нового ввода. Если в // Some operation нет задержки, то они оба должны работать одинаково. - person Richard Matsen; 01.04.2018

Простой способ - обернуть сложный наблюдаемый объект в defer и использовать повторную попытку. результирующий наблюдаемый.

person Andrey Nikolaev    schedule 28.07.2021