Как обрабатывать поток RxJS n элементов за раз, и после того, как элемент будет выполнен, снова автоматически заполнить n?

У меня есть поток событий, и я хотел бы вызвать функцию, которая возвращает обещание для каждого из этих событий, проблема в том, что эта функция очень дорогая, поэтому я хотел бы обрабатывать не более n событий за раз.

Эта диаграмма гальки, вероятно, неверна, но это то, что я хотел бы:

---x--x--xxxxxxx-------------x------------->  //Events
---p--p--pppp------p-p-p-----p------------->  //In Progress
-------d--d--------d-d-dd------dddd-------->  //Promise Done

---1--21-2-34-----------3----4-3210--------   //QUEUE SIZE CHANGES

Это код, который у меня есть до сих пор:

var n = 4;
var inProgressCount = 0;

var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
  .map((ev) => new Date().getTime());

var inProgress$ = events$.controlled();

var done$ = inProgress$      
  .tap(() => inProgressCount++)
  .flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));

done$.subscribeOnNext((timestamp) => {
  inProgressCount--;
  inProgress$.request(Math.max(1, n - inProgressCount));
});

inProgress$.request(n);

Есть две проблемы с этим кодом:

  1. Он использует переменную inProgressCount, которая обновляется функциями побочных эффектов.
  2. Подписка done$ вызывается только один раз, когда я запрашиваю более 1 элемента из контролируемого потока. Это приводит к неправильному обновлению переменной inProgressCount, что в конечном итоге ограничивает очередь до одной за раз.

Вы можете увидеть, как это работает здесь: http://jsbin.com/wivehonifi/1/edit?js,console,output

Вопросы:

  1. Есть ли лучший подход?
  2. Как я могу избавиться от переменной inProgressCount?
  3. Почему подписка done$ вызывается только один раз при запросе нескольких элементов?

Обновление:
Ответ на вопрос № 3: switchMap такой же, как flatMapLatest, поэтому я получил только последний. Обновлен код на flatMap вместо switchMap.


person martincastell    schedule 26.07.2016    source источник


Ответы (1)


На самом деле вам вообще не нужно использовать противодавление. Есть оператор flatMapWithMaxConcurrent, который сделает это за вас. По сути, это псевдоним для вызова .map().merge(concurrency), и он позволяет одновременно выполнять только максимальное количество потоков.

Я обновил ваш jsbin здесь: http://jsbin.com/weheyuceke/1/edit?js,output

Но я прокомментировал важный бит ниже:

const concurrency = 4;

var done$ = events$
  //Only allows a maximum number of items to be subscribed to at a time
  .flatMapWithMaxConcurrent(concurrency, 
    ({timestamp}) =>   
      //This overload of `fromPromise` defers the execution of the lambda
      //until subscription                    
      Rx.Observable.fromPromise(() => { 
        //Notify the ui that this task is in progress                                 
        updatePanelAppend(inProgress, timestamp);
        removeFromPanel(pending, timestamp);
        //return the task
        return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
     }));
person paulpdaniels    schedule 27.07.2016
comment
В RxJs 5 вы должны заменить .flatMapWithMaxConcurrent на .mergeMap(mapper, (resultSelectorFunction | null), concurrency) mergeMap - person user3717718; 22.08.2016