У меня есть поток событий, и я хотел бы вызвать функцию, которая возвращает обещание для каждого из этих событий, проблема в том, что эта функция очень дорогая, поэтому я хотел бы обрабатывать не более n событий за раз.
Эта диаграмма гальки, вероятно, неверна, но это то, что я хотел бы:
---x--x--xxxxxxx-------------x-------------> //Events
---p--p--pppp------p-p-p-----p-------------> //In Progress
-------d--d--------d-d-dd------dddd--------> //Promise Done
---1--21-2-34-----------3----4-3210-------- //QUEUE SIZE CHANGES
Это код, который у меня есть до сих пор:
var n = 4;
var inProgressCount = 0;
var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
.map((ev) => new Date().getTime());
var inProgress$ = events$.controlled();
var done$ = inProgress$
.tap(() => inProgressCount++)
.flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));
done$.subscribeOnNext((timestamp) => {
inProgressCount--;
inProgress$.request(Math.max(1, n - inProgressCount));
});
inProgress$.request(n);
Есть две проблемы с этим кодом:
- Он использует переменную
inProgressCount
, которая обновляется функциями побочных эффектов. Подписка done$ вызывается только один раз, когда я запрашиваю более 1 элемента из контролируемого потока. Это приводит к неправильному обновлению переменнойinProgressCount
, что в конечном итоге ограничивает очередь до одной за раз.
Вы можете увидеть, как это работает здесь: http://jsbin.com/wivehonifi/1/edit?js,console,output
Вопросы:
- Есть ли лучший подход?
- Как я могу избавиться от переменной
inProgressCount
? Почему подписка done$ вызывается только один раз при запросе нескольких элементов?
Обновление:
Ответ на вопрос № 3: switchMap такой же, как flatMapLatest, поэтому я получил только последний. Обновлен код на flatMap вместо switchMap.