Преобразование кода RxJS v4 в v5, обработка очереди с вытягиванием

---abcde-----f-------gh-----i---->  //Events

У меня есть «рабочая очередь», за которой я хочу наблюдать / подписываться. Это массив командных объектов для обработки. Новые элементы работы обычно поступают пачками, и их необходимо обрабатывать последовательно (в порядке получения, по одному, до полной обработки).

Я использую RxJS 5.0.0-beta.6. (версия, навязанная другими библиотеками)

Вот рабочий пример, иллюстрирующий желаемое поведение, но использующий RxJS v4.

Основной рассматриваемый код - это ...

var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
  .timestamp()
  .tap(({timestamp}) => updatePanelAppend(pending, timestamp));

var inProgress$ = events$;

var done$ = inProgress$
  .flatMapWithMaxConcurrent(1, ({timestamp}) => 
                            Rx.Observable.fromPromise(() => {
                              updatePanelAppend(inProgress, timestamp);
                              removeFromPanel(pending, timestamp);
                              return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
                           }));

done$.subscribeOnNext((timestamp) => {
  updatePanelAppend(done, timestamp);
  removeFromPanel(inProgress, timestamp);
});

http://jsbin.com/meyife/edit?js,output

Учитывая текущее состояние бета-версии API и неполную / изменяющуюся документацию, я не могу понять, как это сделать в RxJS 5.

Обновление: это руководство по переходу для перехода с v4 - v5 показывает многие функции, которые были удалены, но не указывает, как делать что-то по-новому. Примеры удаленных операций: .tap, .controlled, .flatMapWithMaxConcurrent (переименован).


person Mark Eric    schedule 02.08.2016    source источник
comment
Вопрос, откуда взялось решение v4: stackoverflow.com/questions/38601451/   -  person Mark Eric    schedule 02.08.2016


Ответы (1)


  • _1 _ / _ 2_ - теперь принимает параметр параллелизма

  • tap -> do

  • subscribeOnNext больше не существует, поэтому просто используйте subscribe с одним параметром.

  • fromPromise в RxJS 5 нет перегрузки, поэтому используйте defer вместо.

См. Обновленный jsbin здесь

person paulpdaniels    schedule 03.08.2016
comment
Превосходно! Очень полезно! Примечание: после изменения версии RxJS на 5.0.0-beta.6 потребовались некоторые изменения. Как будто в этой версии нет оператора .timestamp (). Поэтому я просто вытащил отметку времени из события щелчка мыши, поскольку это не было центральным элементом кода. - person Mark Eric; 19.08.2016