Наблюдаемый буфер, пока не завершится другой наблюдаемый

Я использую RxSwift для завершения процесса синхронизации сервера мобильного приложения. У меня есть Observable<RemoteEvent>, который обертывает соединение через веб-сокет и выдает каждое полученное сообщение как Event. Точно так же у меня есть Observable<SynchronizationResult>, который обертывает процесс синхронизации API. Как только мое приложение открывает соединение WebSocket, сервер отправляет hello сообщение. После получения этого сообщения я хочу запустить процесс синхронизации и буферизовать любые события, пока синхронизация не будет завершена. Вот где я борюсь. В настоящее время у меня есть:

self.eventStreamService.observe(connection).scan((nil, [])) { (state, event) -> (Observable<RemoteEvent>?, [RemoteEvent]) in
  guard event.type == "hello" else {
    return (state.0?.concat(Observable.just(event)), state.1 + [event])
  }

  // This is the sync operation
  return (
    self.synchronizationService
      .synchronize(ConnectionSynchronizationContext(connection: connection), lightweight: true)
      .toArray()
      .flatMap { results -> Observable<RemoteEvent> in
        (state.1 + [event]).toObservable()
      },
    []
  )
}
.flatMapLatest { $0.0 ?? Observable.empty() }

Несмотря на то, что это довольно уродливо, в нем также есть существенная ошибка: любое входящее событие приводит к повторной подписке на синхронизацию Observable, которая затем перезапускает весь процесс синхронизации. Я уверен, что должен быть лучший способ сделать это.


person Colin M    schedule 27.08.2016    source источник
comment
Будет ли hello первым событием, создаваемым веб-сокетом? Если нет, должны ли буферизироваться какие-либо события до hello или также должны буферизоваться что-нибудь после hello / sync-start? Этот вариант использования кажется странным, но я проигнорирую его и приму его за чистую монету, поскольку я предполагаю, что вы изменили свой вопрос, чтобы он был простой версией вашего фактического варианта использования.   -  person solidcell    schedule 28.08.2016
comment
@solidcell Можно с уверенностью предположить, что hello будет первым событием. Все, что до этого (практически ничего), можно игнорировать. Вы правы, что это и странный вариант использования, и упрощенная версия проблемы. В конечном счете, однако, для приложения, которое у меня есть, это самый простой способ предотвратить состояние гонки (например, если происходит синхронизация, а затем подключается веб-сокет, события могут быть пропущены; если веб-сокет подключается и синхронизируется без буфера, события могут быть обработаны до того, как станут доступны подтверждающие данные). Это решение в основном временное, пока не будут решены другие проблемы.   -  person Colin M    schedule 28.08.2016
comment
Как это возможно ?: если происходит синхронизация, а затем подключается веб-сокет. Если веб-сокету необходимо подключиться, чтобы появилось приветственное сообщение, чтобы мы попытались запустить синхронизацию, тогда как могло случиться так, что синхронизация может произойти до подключения веб-сокета?   -  person solidcell    schedule 28.08.2016
comment
Извините, я объяснял, почему я сейчас именно в этом направлении. Это правильно, что веб-сокет отправляет приветствие, и после этого происходит синхронизация. Я предлагал, почему у меня сейчас нет такой настройки, чтобы синхронизация происходила перед подключением.   -  person Colin M    schedule 28.08.2016


Ответы (1)


Вот как можно получить нужную функциональность:

// this is a stub for the purpose of the example
let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
let websocketEvents = interval
    .map { i -> String in
        if i == 1 {
            return "hello"
        } else {
            return String(i)
        }
    }
    .replayAll()

websocketEvents.connect()

func performSync() -> Observable<Void> {
    return Observable<Void>.create { o in
        print("starting sync")
        // actually start sync with server
        // ....
        delay(2.0) {
            print("sync finished")
            o.onNext(())
        }
        return NopDisposable.instance
    }
}

// websocket events as they come, regardless of sync status
websocketEvents
    .subscribeNext { e in
        print("websocket event received: \(e)")
    }

// all websocket events, buffered and only emitted post-sync
websocketEvents
    .filter { $0 == "hello" }
    .flatMapLatest { _ in performSync() }
    .flatMapLatest { _ in websocketEvents }
    .subscribeNext { e in
        print("websocket event post sync: \(e)")
    }

Это выведет:

событие websocket получено: 0
событие websocket получено: привет
начало синхронизации
получено событие websocket: 2
получено событие websocket: 3
синхронизация завершена
синхронизация сообщения события websocket: 0
Синхронизация сообщений о событиях websocket: привет
Синхронизация сообщений событий websocket: 2
Синхронизация сообщений событий websocket: 3
полученных событий websocket: 4
Синхронизация сообщений событий websocket: 4
полученных событий websocket: 5
Синхронизация сообщений о событиях websocket: 5

person solidcell    schedule 29.08.2016
comment
Это круто! Один вопрос - правильно ли я предполагаю, что replayAll будет продолжать удерживать все элементы, отправленные из потока, даже когда они были воспроизведены? Есть ли здесь опасения по поводу использования памяти? Мой текущий (очень хакерский и сломанный) подход накладывает ограничение на максимальный размер буфера и выдает ошибку из потока, если он не может завершить синхронизацию до того, как записи x будут буферизованы (не было указано в вопросе для краткости) - person Colin M; 30.08.2016
comment
Да, это определенно повод для беспокойства. Есть replay(bufferCount: Int), который может ограничить размер буфера. Однако вам следует подумать о другом подходе к вашей ситуации синхронизации вместо того, чтобы буферизовать элементы вообще, если это возможно. - person solidcell; 30.08.2016
comment
Спасибо за разъяснения. Что касается синхронизации, я использовал несколько разных подходов. Тот, которого я больше всего поклонник, имеет сервер, хранящий кеш событий за последние x продолжительности. Затем при подключении он просто сбрасывает все, что вы пропустили, поэтому клиенту не нужно выполнять дополнительную работу. Проблема в том, что (в зависимости от приложения и типов событий) это может привести к значительно большему использованию данных, чем выполнение вызовов API. - person Colin M; 30.08.2016