Я использую RxSwift для завершения процесса синхронизации сервера мобильного приложения. У меня есть Observable<RemoteEvent>
, который обертывает соединение через веб-сокет и выдает каждое полученное сообщение как Event
. Точно так же у меня есть Observable<SynchronizationResult>
, который обертывает процесс синхронизации API. Как только мое приложение открывает соединение WebSocket, сервер отправляет hello
сообщение. После получения этого сообщения я хочу запустить процесс синхронизации и буферизовать любые события, пока синхронизация не будет завершена. Вот где я борюсь. В настоящее время у меня есть:
self.eventStreamService.observe(connection).scan((nil, [])) { (state, event) -> (Observable<RemoteEvent>?, [RemoteEvent]) in
guard event.type == "hello" else {
return (state.0?.concat(Observable.just(event)), state.1 + [event])
}
// This is the sync operation
return (
self.synchronizationService
.synchronize(ConnectionSynchronizationContext(connection: connection), lightweight: true)
.toArray()
.flatMap { results -> Observable<RemoteEvent> in
(state.1 + [event]).toObservable()
},
[]
)
}
.flatMapLatest { $0.0 ?? Observable.empty() }
Несмотря на то, что это довольно уродливо, в нем также есть существенная ошибка: любое входящее событие приводит к повторной подписке на синхронизацию Observable
, которая затем перезапускает весь процесс синхронизации. Я уверен, что должен быть лучший способ сделать это.
hello
будет первым событием. Все, что до этого (практически ничего), можно игнорировать. Вы правы, что это и странный вариант использования, и упрощенная версия проблемы. В конечном счете, однако, для приложения, которое у меня есть, это самый простой способ предотвратить состояние гонки (например, если происходит синхронизация, а затем подключается веб-сокет, события могут быть пропущены; если веб-сокет подключается и синхронизируется без буфера, события могут быть обработаны до того, как станут доступны подтверждающие данные). Это решение в основном временное, пока не будут решены другие проблемы. - person Colin M   schedule 28.08.2016