Как мне использовать Rx + DynamicData для периодической проверки обновлений от многих онлайн-сервисов?

У меня есть базовое приложение Calendar / Agenda, в котором будут перечислены самые последние события из ряда учетных записей и календарей. Например, предположим, что у меня есть 3 учетных записи: две разные учетные записи Microsoft и одна учетная запись Google. В настоящее время я храню их как SourceCache<Account, string> с именем Accounts в службе (AccountsService).

SourceCache<T1,T2> является частью DynamicData ... которая в основном составляет реактивные коллекции. Я хочу, чтобы он был реактивным, чтобы при добавлении или удалении учетной записи все в приложении (страница настроек, страницы календаря и т. Д.) Обновлялось автоматически.

Теперь в каждой учетной записи может быть несколько Calendar. И для каждого из этих Calendar я хочу загрузить все предстоящие CalendarEvent. Уловка в том, что мне нужно делать это через регулярные промежутки времени, чтобы видеть, были ли добавлены новые события или были ли они изменены.

Вот как я сейчас это делаю, но, боюсь, это действительно плохой рецепт.

var calendarSet = this.accountsService.Accounts.Connect()
    .ObserveOn(RxApp.TaskpoolScheduler)
    .TransformMany(x =>
    {
        ReadOnlyObservableCollection<Models.Calendar> subCalendars;
        x.CalendarService.Calendars.Connect()
            .AutoRefreshOnObservable(calendar => calendar.IsEnabledChanged)
            .AutoRefreshOnObservable(calendar => calendar.IsColorChanged)
            .Filter(calendar=>calendar.IsEnabled)
            .Bind(out subCalendars)
            .Subscribe();
        return subCalendars;
     }, x => x.CacheKey)
     .ObserveOnDispatcher()
     .Publish();

calendarSet
    .Bind(out calendars)
    .Subscribe();


var eventSet = calendarSet
    .ObserveOn(RxApp.TaskpoolScheduler)
    .Transform( calendar =>
    {
        var events = new List<Models.CalendarEvent>();
        Debug.WriteLine(calendar.Name);
        calendar.CalendarService.CalendarEventsObservable(calendar).Subscribe(items =>
        {
            events.AddRange(items);
        });
        return events;
    })
    .TransformMany(x => x, x => x.Key)
    .Filter(x => x.EndDateTime > DateTimeOffset.Now)
    .Sort(new Models.CalendarEventSorter())
    .ObserveOnDispatcher()
    .Bind(out calendarEvents)
    .Subscribe();

calendarSet.Connect();

Большая часть - это то, где события также загружаются через подписку на наблюдаемое. Здесь я поставил таймер, который позволяет мне контролировать, как часто проверять онлайн-сервисы. Выглядит это так (20 секунд только на тестирование!):

public IObservable<List<Models.CalendarEvent>> CalendarEventsObservable(Models.Calendar calendar)
    {
        var obs = Observable.Interval(TimeSpan.FromSeconds(20)).SelectMany(async x =>
        {
            var items = await GetAllEventsForCalendarAsync(calendar);
            return items;
        });

        return obs;
    }

Кажется, это работает! Я могу включить / отключить определенные календари, и события будут появляться или исчезать из моего связанного списка. Я вижу, что события обновляются через регулярные промежутки времени ... и я предполагаю, что, поскольку я использую TransformMany с ключом, привязанным к онлайн-идентификатору CalenderEvents (который исправлен), недавно загруженные события просто заменяют старые в кеше . Я не вижу мерцания в пользовательском интерфейсе.

** Исправление: похоже, это работает из-за взлома, который я случайно оставил после другого испытания. В исходных учетных записях SourceCache я запускаю таймер, который вызывает Accounts.Refresh (). Если вынуть это, ничего не работает.

Это правильный способ сделать это? Пожалуйста, просветите меня ... Я немного борюсь с Rx и DynamicData. Есть так много операторов, с которыми я пока не знаю, что делать.

Спасибо!


person Lee McPherson    schedule 02.12.2018    source источник


Ответы (2)


Кажется, вы ответили на свой вопрос, и код выглядит более оптимальным, чем оригинал. Однако я предлагаю несколько оптимизаций:

  1. Используйте AutoRefreshOnObservable, который позволит вам создать один наблюдаемый, а не использовать AutoRefresh дважды.

  2. По коду я не могу сказать, возвращает ли CalendarEventsObservable одно значение или коллекцию. Если он возвращает коллекцию, вместо использования AddOrUpdate вы можете использовать EditDiff. EditDiff ожидает, что будет указано равенство по сравнению, и предотвращает запуск ненужных уведомлений.

person Roland Pheasant    schedule 04.12.2018

Я немного переписал код, и, похоже, он работает намного лучше. Он работает даже без хака, когда я вручную вызываю Refresh() на исходном Account SourceCache.

Я понял, что неправильно использовал метод публикации. Кроме того, подписка на наблюдаемый объект в операторе преобразования не сработает, поскольку он не является наблюдаемым объектом DynamicData (с наборами изменений). Вместо этого я решил сделать SubscribeMany, чтобы подписаться на все CalendarEventObservable от каждого Calendar, и в логике действий подписки я бы заполнил новый SourceCache из CalendarEvents. Никакие события не будут удалены, но повторяющиеся события просто перезапишут старые из-за ключа кеша, и я смог отфильтровать события из не выбранных каледаров.

 private SourceCache<Models.CalendarEvent, string> calendarEventCache;
 public IObservableCache<Models.CalendarEvent, string> CalendarEventCache => calendarEventCache.Connect().AutoRefreshOnObservable(x=>x.Parent.IsEnabledChanged).Filter(x=>x.Parent.IsEnabled).AsObservableCache();

//////////////////
///IN CONSTRUCTOR:

        calendarEventCache = new SourceCache<Models.CalendarEvent, string>(x => x.Key);

        calendarsCache = this.accountsService.Accounts.Connect()
            .ObserveOn(RxApp.TaskpoolScheduler)
            .TransformMany(x =>
            {
                ReadOnlyObservableCollection<Models.Calendar> subCalendars;
                x.CalendarService.Calendars.Connect()
                            .AutoRefreshOnObservable(calendar => calendar.IsEnabledChanged)
                            .AutoRefreshOnObservable(calendar => calendar.IsColorChanged)
                            .Filter(calendar => calendar.IsEnabled)
                            .Bind(out subCalendars)
                            .Subscribe();
                return subCalendars;
            }, x => x.CacheKey)
            .ObserveOnDispatcher()
            .AsObservableCache();

        calendarsCache.Connect()
            .ObserveOn(RxApp.TaskpoolScheduler)
            .SubscribeMany(calendar =>
            {
                return calendar.CalendarService.CalendarEventsObservable(calendar).Subscribe(calendarEvent =>
               {
                   calendarEventCache.AddOrUpdate(calendarEvent);
               });
            })
            .Subscribe();

Теперь в моей модели просмотра пользовательского интерфейса я подписываюсь на SourceCache<CalendarEvent,string>;

person Lee McPherson    schedule 03.12.2018