Предотвращение противодавления с помощью горячих наблюдаемых

Я использую библиотеку Android ReactiveLocation для получения регулярных обновлений местоположения. Я хочу продолжать получать обновления местоположения, даже если в моем приложении они не используются, чтобы у меня всегда была актуальная информация о местоположении, которую я мог бы использовать немедленно, если мне нужно.

Вот как я начинаю получать обновления местоположения в основном компоненте моего приложения. Я хочу повторно опубликовать последнее значение и любые новые обнаруженные местоположения любому подписчику, расположенному дальше по цепочке в моем приложении, и для этого предназначен replay(1).

locationProvider = new ReactiveLocationProvider(context);
locationObservable = locationProvider.getUpdatedLocation(locationRequest)
.replay(1);

В другом месте приложения я подписываюсь на эту переизданную наблюдаемую:

locationSubscription = locationObservable
        .filter(new Func1<Location, Boolean>() {
            @Override
            public Boolean call(Location location) {
                return location.getAccuracy() < LOCATION_ACCURACY_THRESHOLD;
            }
        })
        .subscribe(new Action1<Location>() {
            @Override
            public void call(Location location) {
            }
        });

Похоже, это работает: моему конечному подписчику немедленно возвращается последнее местоположение, и он продолжает получать новые обновления, но я хочу убедиться, что я не создаю огромный буфер неиспользуемых местоположений где-то в цепочке, когда последний подписчик не подписан. Я нуб Rx. Как противодавление применимо к этой ситуации? Делает ли replay(1) то, что я ожидаю, и отбрасывает все ненужные местоположения, кроме последнего?


person Graham Borland    schedule 16.10.2015    source источник


Ответы (1)


Есть несколько факторов, влияющих на повтор. replay(1) выполняет согласование противодавления и воспроизводит значения по запросу абонента.

Это означает, что если подписчик находится за асинхронной границей, replay начнет накапливать значения. Скорость, с которой это происходит, зависит от текущего набора подписчиков: оператор запрашивает максимум того, что запрашивают подписчики, поэтому смешивание ограниченных и неограниченных подписчиков может привести к раздуванию буфера, если они потребляют с разной скоростью.

Вы можете думать о его буфере как об односвязном списке, где каждый подписчик указывает на узел в этом списке, а оператор указывает непосредственно перед концом списка. Из-за одиночных ссылок, если нет подписчиков, начало списка может быть удалено сборщиком мусора.

Если был большой запросчик, отписавшийся от подписки, replay() "сбросит" лишние значения и продолжит удерживать по 1 элементу за раз, медленно "стекая".

Альтернативой является использование BehaviorSubject, которое запоминает самое последнее значение и вообще не выполняет обратное давление, поэтому нет риска раздувания буфера, но есть риск MissingBackpressureException.

Изменить:

На самом деле, я думаю, что в replay() есть ошибка обратного давления, поэтому холодные источники могут работать неправильно.

person akarnokd    schedule 16.10.2015
comment
Итак, если я вас правильно понял, если все происходит в основном потоке Android (без переключения планировщика — это то, что вы имеете в виду под асинхронной границей?) и у меня есть только один подписчик, то я думаю, что он делает то, что я хочу. - person Graham Borland; 17.10.2015
comment
Асинхронная граница (в основном), когда вы применяете оператор с параметром планировщика - person akarnokd; 17.10.2015