rx: развернуть массив на несколько потоков

У меня есть поток, содержащий массив, каждый элемент которого имеет идентификатор. Мне нужно разделить это на поток для каждого идентификатора, который завершится, когда исходный поток больше не несет идентификатор.

Например. последовательность входного потока с этими тремя значениями

[{a:1}, {b:1}]    [{a:2}, {b:2}, {c:1}]     [{b:3}, {c:2}]

должен вернуть три потока

a -> 1 2 |
b -> 1 2 3
c ->   1 2

Где a завершилось на 3-м значении, так как его идентификатор исчез, а c был создан на 2-м значении, так как его идентификатор появился.

Я пытаюсь groupByUntil, что-то вроде

 var input = foo.share();              
 var output = input.selectMany(function (s) {
                        return rx.Observable.fromArray(s);
                }).groupByUntil(
                        function (s) { return s.keys()[0]; },
                        null,
                        function (g) { return input.filter(
                                function (s) { return !findkey(s, g.key); }
                        ); }
                )

Итак, сгруппируйте по идентификатору и удалите группу, когда входной поток больше не имеет идентификатора. Кажется, это работает, но два варианта использования ввода кажутся мне странными, как будто может возникнуть странная зависимость порядка при использовании одного потока для управления вводом groupByUntil и удалением групп.

Есть ли способ лучше?

обновить

Здесь действительно существует странная проблема со временем. fromArray по умолчанию использует планировщик currentThread, в результате чего события из этого массива будут чередоваться с событиями из ввода. Затем условия удаления для группы оцениваются в неподходящее время (до обработки групп из предыдущего ввода).

Возможный обходной путь — использовать fromArray(.., rx.Scheduler.immediate), что позволит синхронизировать сгруппированные события с вводом.


person user1009908    schedule 19.02.2014    source источник


Ответы (1)


да, единственная альтернатива, о которой я могу думать, - это управлять состоянием самостоятельно. Хотя не знаю, что лучше.

var d = Object.create(null);
var output = input
    .flatMap(function (s) {
        // end completed groups
        Object
            .keys(d)
            .filter(function (k) { return !findKey(s, k); })
            .forEach(function (k) {
                d[k].onNext(1);
                d[k].onCompleted();
                delete d[k];
            });
        return Rx.Observable.fromArray(s);
    })
    .groupByUntil(
        function (s) { return s.keys()[0]; },
        null,
        function (g) { return d[g.key] = new Rx.AsyncSubject(); });
person Brandon    schedule 19.02.2014