Vert.x — подписки GraphQL с DataInputStreams

У меня есть сторонний код, к которому я подключаюсь через DataInputStream. Сторонний код постоянно выдает информацию по мере ее создания. Когда появляется что-то интересное, я хочу передать это подписке GraphQL.

Я не уверен, как связать сторонний код с кодом подписки GraphQL на стороне сервера, учитывая этот сценарий. Мы ценим любые предложения.

Некоторый концептуальный код приведен ниже:

public void liveStream(DataInputStream in) {
  // Sit and constantly watch input stream and report when messages come in
  while(true) {
    SomeMessage message = readFromInputStream(in);
    System.out.println("Received Message Type:" + message.getType());

    // Convert SomeMessage into the appropriate class based on its type
    if (message.getType() == "foo") {
      Foo foo = convertMessageToFoo(message);
    } else if (message.getType() == "bar") {
      Bar bar = convertMessageToBar(message);
    } else if (howeverManyMoreOfThese) {
      // Keep converting to different objects
    }
  }       
}

// The client code will eventually trigger this method when 
// the GraphQL Subscription query is sent over
VertxDataFetcher<Publisher<SomeClassTBD>> myTestDataFetcher() {
  return new VertxDataFetcher<> (env, future) -> {
    try {
      future.complete(myTest());
    } catch(Exception e) {
      future.fail(e);
    }
  });
}

person ekjcfn3902039    schedule 15.10.2019    source источник
comment
Вот пример сервера Vert.x GraphQL с подпиской github.com/vert-x3/vertx-examples/tree/master/ HTH   -  person tsegismont    schedule 16.10.2019


Ответы (1)


Хорошо, я обернул свой код liveStream в ObservableOnSubscribe с помощью executorService, и я получаю обратно все данные. Думаю, теперь я могу либо передать его прямо во внешний интерфейс, либо создать отдельные издатели для работы с определенными типами объектов, а подписки graphql укажут на соответствующих издателей.

ExecutorService executor = Executors.newSingleThreadExecutor;

ObservableOnSubscribe<SomeClassTBD> handler = emitter ->
  executor.submit(() -> {
    try {
      //liveStream code here
      emitter.onComplete();
    }
    catch(Exception e) {
      emitter.onError(e);
    }
    finally {
      // Cleanup here
    }
  });
  Observable<SomeClassTBD> = Observable.create(handler);
person ekjcfn3902039    schedule 16.10.2019