У меня есть поток, содержащий массив, каждый элемент которого имеет идентификатор. Мне нужно разделить это на поток для каждого идентификатора, который завершится, когда исходный поток больше не несет идентификатор.
Например. последовательность входного потока с этими тремя значениями
[{a:1}, {b:1}] [{a:2}, {b:2}, {c:1}] [{b:3}, {c:2}]
должен вернуть три потока
a -> 1 2 |
b -> 1 2 3
c -> 1 2
Где a завершилось на 3-м значении, так как его идентификатор исчез, а c был создан на 2-м значении, так как его идентификатор появился.
Я пытаюсь groupByUntil, что-то вроде
var input = foo.share();
var output = input.selectMany(function (s) {
return rx.Observable.fromArray(s);
}).groupByUntil(
function (s) { return s.keys()[0]; },
null,
function (g) { return input.filter(
function (s) { return !findkey(s, g.key); }
); }
)
Итак, сгруппируйте по идентификатору и удалите группу, когда входной поток больше не имеет идентификатора. Кажется, это работает, но два варианта использования ввода кажутся мне странными, как будто может возникнуть странная зависимость порядка при использовании одного потока для управления вводом groupByUntil и удалением групп.
Есть ли способ лучше?
обновить
Здесь действительно существует странная проблема со временем. fromArray по умолчанию использует планировщик currentThread, в результате чего события из этого массива будут чередоваться с событиями из ввода. Затем условия удаления для группы оцениваются в неподходящее время (до обработки групп из предыдущего ввода).
Возможный обходной путь — использовать fromArray(.., rx.Scheduler.immediate), что позволит синхронизировать сгруппированные события с вводом.