Использование нескольких ResponseObserver.onNext приведет к зависанию службы - gRPC Java

Я хочу отправлять данные кусками по 50 с помощью gRPC, однако при запуске responseObserver.onNext несколько раз он просто застревает и не отправляет данные. Но когда я останавливаю поток, я получаю данные.

Это мой код:

// List<List<MyClass>> listOfMyClass
for (List<MyClass> myClasses : listOfMyClass) {
  responseObserver.onNext(buildReply(myClasses));
}
responseObserver.onCompleted();

Это заставит его застрять. Но если я запущу только responseObserver.onNext(buildReply(myClasses)); один раз, я получу данные мгновенно.

Мой прото:

message Request {
    string number = 1;
}

message Reply {
    repeated CustomMessage results = 1;
}

service Service {
    rpc MyRequest (Request) returns (stream Reply) {}
}

Я использовал графический интерфейс под названием https://github.com/uw-labs/bloomrpc, который должен легко отображать поток.


person Sebastian Berglönn    schedule 09.10.2019    source источник


Ответы (1)


Интерфейс StreamObserver не является блокирующим и не оказывает обратного давления на исходящие данные. Чтобы дать клиенту обратное давление, вам необходимо преобразовать StreamObserver, который клиент использует для отправки данных в ServerCallStreamObserver, и наблюдать за битом «готово». Что-то вроде этого:

ClientCallStreamObserver<MyClass> clientCallStreamResponseObserver =
      (ClientCallStreamObserver<File>) responseObserver;
clientCallStreamObserver.setOnReadyHandler(new Runnable() {
    public void run() {
      sendUntilNotReady(clientCallStreamResponseObserver);
    }
});
sendUntilNotReady(clientCallStreamResponseObserver);

...

private void sendUntilNotReady(ClientCallStreamObserver<MyClass> clientCallStreamResponseObserver) {
  while (clientCallStreamResponseObserver.isReady()) {
    clientCallStreamResponseObserver.onNext(... /* build reply for next item in listOfMyClass */);
  }
}
person voidzcy    schedule 23.10.2019