In a server-side streaming scenario, we publish to the
"ServerCallStreamObserver" a high number of items to the "onNext()" call.
As recommended we need to handle the back pressure and check the readiness
of the stream before sending the message. Hence using the method
ServerCallStreamObserver's "isReady()". If not ready we will put the thread
to sleep until the ServerCallStreamObserver is ready again.
To do so we rely on setOnReadyHandler(Runnable onReadyHandler) which is
supposed to be invoked when the ServerCallStreamObserver's isReady()
transitions from false to true. But this leads to a thread block situation,
while the main thread is waiting to restart again the "onReadyHandler" is
never invoke which blocks the publishing to the stream. Later through the
help of logs I see that the "onReadyHandler" is waiting to be invoked by
the main thread itself. Sample program -
@Override
public void streamNumbers(NumberRequest request,
StreamObserver<NumberResponse> responseObserver) {
int count = request.getCount();
this.serverResponseObserver =
(ServerCallStreamObserver<NumberResponse>) responseObserver;
this.onReadyHandler = new OnReadyHandler();
serverResponseObserver.setOnReadyHandler(onReadyHandler);
//serverResponseObserver.disableAutoRequest();
System.out.println("Stream numbers start --> " +
Thread.currentThread().getName());
for (int i = 0; i <= count; i++) {
NumberResponse response =
NumberResponse.newBuilder().setNumber(i).build();
if(!serverResponseObserver.isReady()) {
System.out.println("serverResponseObserver not ready----->"
+ serverNotReadyCount++ + "current item -->" + i);
this.onReadyHandler.wasReady = false;
while(!onReadyHandler.wasReady) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
serverResponseObserver.onNext(response);
}
serverResponseObserver.onCompleted();
}
class OnReadyHandler implements Runnable {
private boolean wasReady = false;
@Override
public void run() {
System.out.println( " OnReadyHandler inside run->" +
serverResponseObserver.isReady() + wasReady + onReadyHandlerRunCount++ +
Thread.currentThread().getName());
if (serverResponseObserver.isReady() && !wasReady) {
System.out.println( " Response observer is ready and the
Backpressure is released --> ");
wasReady = true;
}
}
}
Appreciate if the community could give some input on the implementation or
provide me with idea to handle the backpressure in the correct way.
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/eeb41336-ba86-4c6c-b4d9-98250865eed0n%40googlegroups.com.