Реактивное программирование с использованием RxScala

У меня есть Observable, который подключается к службе через протокол Socket. Подключение к сокету происходит через клиентскую библиотеку. В клиентской библиотеке, которую я использую, есть java.util.Observer, с помощью которой я могу зарегистрироваться для событий, которые в нее помещаются.

final class MyObservable extends Observable[MyEvent] {

  def subscribe(subscriber: Subscriber[MyEvent]) = {
    // connect to the Socket (Step: 1)
    // get the responses that are pushed (Step: 2)
    // transform them into MyEvent type (Step: 3)
  }
}

У меня есть два открытых вопроса, которые я не понимаю.

Как я могу получить результат шага 3 в моем подписчике?

Каждый раз, когда я получаю MyEvent с подписчиком, как показано ниже, я вижу, что создается новое соединение. В конце концов, Шаг 1, Шаг 2 и Шаг 3 выполняются для каждого входящего события.

val myObservable = new MyObservale()
myObservable.subscribe()

person joesan    schedule 24.07.2015    source источник
comment
Где твой Subscriber? Не могли бы вы использовать myObservable.subscribe(mySubscriber)?   -  person zsxwing    schedule 26.07.2015
comment
Но как передать результат шага 3 в методе подписки класса MyObservable во внешний мир?   -  person joesan    schedule 27.07.2015
comment
Существует ли библиотека RxScala, достойная внимания? Как поживает RxScala от NetFlix? Есть ли у него возможность обратного давления?   -  person joesan    schedule 27.07.2015
comment
Разве вы еще не используете RxScala?   -  person paulpdaniels    schedule 27.07.2015
comment
Я использую библиотеку monifu, которая является реализацией, вдохновленной Rx.NET. В библиотеке monifu есть много вкусностей, которые на данный момент мне немного трудно понять!   -  person joesan    schedule 27.07.2015
comment
RxScala поддерживает обратное давление. Это оболочка RxJava.   -  person zsxwing    schedule 27.07.2015
comment
Есть ли пример использования RxScala для описанной выше ситуации? Я имею в виду, что все, что я хочу сделать, это получить результат шага 3 в MyObservable! и это должно происходить, когда кто-то подписывается на MyObservable!   -  person joesan    schedule 27.07.2015


Ответы (1)


Если я не правильно понял ваш вопрос, просто позвоните onNext:

def subscribe(subscriber: Subscriber[MyEvent]) = {
  // connect to the Socket (Step: 1)
  // get the responses that are pushed (Step: 2)
  // transform them into MyEvent type (Step: 3)

  // finally notify the subscriber:
  subscriber.onNext(myEventFromStep3)
}

и код, который подписывается, будет делать что-то вроде:

myObservable.subscribe(onNext = println(_))
person Brandon    schedule 29.07.2015