Как создать наблюдаемую разницу между двумя отсортированными наблюдаемыми?

Скажем, у меня есть две наблюдаемые отсортированных двойников. Я хотел бы получить разницу между ними как наблюдаемую. Например:

           1       2           4        
 left:   ──o───────o───────────o────/
           1    3  4   5
 right:  ──o────o──o───o───/
                   l2          r3   r5
 output: ──────────o───────────o────o─/

Императивная реализация для этого проста: держите список элементов на той стороне, до которой вы еще не дошли, и «испускайте» элементы с другой стороны.

Каков канонический подход к этому в мире RFP? Я специально использую RxScala.


person Omer van Kloeten    schedule 03.01.2017    source источник
comment
Можете ли вы предоставить мраморную диаграмму, показывающую, когда испускаются значения из источников и как это приводит к полученному наблюдаемому?   -  person Enigmativity    schedule 04.01.2017
comment
@Enigmativity Я добавил мраморную диаграмму. ХТН   -  person Omer van Kloeten    schedule 04.01.2017
comment
Это хорошая мраморная диаграмма. Как ты это сделал? Я до сих пор не могу понять правило. Можете ли вы объяснить это, пожалуйста?   -  person Enigmativity    schedule 04.01.2017
comment
@Enigmativity это кустарная мраморная диаграмма ручной работы, сделанная только из лучших символов ASCII (иначе я сделал ее вручную). простое объяснение состоит в том, что я хочу, чтобы разница между обоими наблюдаемыми была отсортирована, и как только я смогу понять, что она не существует ни с одной из сторон.   -  person Omer van Kloeten    schedule 04.01.2017
comment
Возможно, вам придется попробовать еще раз - я хочу, чтобы разница между обоими наблюдаемыми была отсортирована, и как только я смогу понять, что она не существует ни с одной из сторон - это просто не имеет для меня никакого смысла, извините .   -  person Enigmativity    schedule 05.01.2017
comment
@Enigmativity Оба источника передают отсортированные данные. Я хочу узнать, как можно скорее, какие элементы не существуют ни с той, ни с другой стороны. Я не хочу ждать, пока потоки закончатся. Технически это возможно, и императивная реализация проста.   -  person Omer van Kloeten    schedule 06.01.2017


Ответы (2)


Вот как я бы сделал это в rxjava, подразумевая, что две наблюдаемые имеют одинаковую длину.

    Observable<Integer> obs1 = Observable.just(1, 2, 4, 6);
    Observable<Integer> obs2 = Observable.just(1, 3, 4, 5);

    obs1.zipWith(obs2, (integer, integer2) -> {
        if (!Objects.equals(integer, integer2)) {
            return Observable.just(integer).concatWith(Observable.just(integer2));
        } else {
            return Observable.empty();
        }
    })
      .flatMap(observable -> observable)
      .sorted()
      .forEach(System.out::println);

ИЗМЕНИТЬ

Другим подходом было бы использование коллекции

    Observable<Integer> obs1 = Observable.just(1, 2, 4);
    Observable<Integer> obs2 = Observable.just(1, 3, 4, 5);


    obs1.mergeWith(obs2)
            .sorted()
            .reduce(new ArrayList<Integer>(), (integers, integer) -> {
                if (integers.contains(integer)) {
                    integers.remove(integer);
                } else {
                    integers.add(integer);
                }
                return integers;
            })
            .flatMapIterable(integers -> integers)
            .forEach(System.out::println);
person Alexander Perfilyev    schedule 03.01.2017
comment
Здесь вы делаете два предположения: 1. Наблюдаемые имеют одинаковую длину (я обновил вопрос, чтобы показать, что это не так) и; 2. Одна из наблюдаемых не может опережать другую (они движутся с одинаковой скоростью). - person Omer van Kloeten; 04.01.2017
comment
Разве этот новый подход не потребует блокировки до тех пор, пока обе наблюдаемые не будут завершены? - person Omer van Kloeten; 04.01.2017
comment
@OmervanKloeten нет, блокировки нет, все операторы производят неблокирующие наблюдаемые. - person Alexander Perfilyev; 04.01.2017
comment
Я имел в виду, что оператор «сортировки» ожидает возврата всех элементов из обоих источников, прежде чем выдать свой первый результат. - person Omer van Kloeten; 04.01.2017
comment
@OmervanKloeten, вы можете удалить его и получить тот же результат. - person Alexander Perfilyev; 04.01.2017
comment
Ага, понятно. Ты прав. Однако разве вы не накапливаете весь полученный наблюдаемый объект в памяти (с помощью сокращения) перед его выпуском, эффективно блокируя следующий шаг (flatMapIterable)? - person Omer van Kloeten; 04.01.2017
comment
@OmervanKloeten да, он работает как оператор .distinct(), сохраняя все промежуточные результаты в буфере памяти. - person Alexander Perfilyev; 04.01.2017
comment
Да, такой отличный ответ, и спасибо, что дали и объяснили его, но я искал версию, в которой результаты будут опубликованы подписчикам, как только они станут доступны. - person Omer van Kloeten; 04.01.2017

Я не думаю, что в RxJava есть много способов сделать эту симметричную разницу «в реальном времени» ... Главным образом потому, что ваше исходное предположение (наблюдаемые сортируются) не может быть сделано с помощью общих операторов, поэтому немногие помогут вам с этим.

Однако вы можете попытаться написать для этой цели собственный оператор, вдохновленный sequenceEqual: он внутренне продвигается шаг за шагом между двумя издателями для выполнения сравнения на равенство, так что это близко к тому, что вы хотите сделать.

person Simon Baslé    schedule 05.01.2017