In a server-side streaming scenario, we publish to the 
"ServerCallStreamObserver" a high number of items to the "onNext()" call. 
As recommended we need to handle the back pressure and check the readiness 
of the stream before sending the message. Hence using the method 
ServerCallStreamObserver's "isReady()". If not ready we will put the thread 
to sleep until the ServerCallStreamObserver is ready again. 

To do so we rely on setOnReadyHandler(Runnable onReadyHandler) which is 
supposed to be invoked when the  ServerCallStreamObserver's isReady() 
transitions from false to true. But this leads to a thread block situation, 
while the main thread is waiting to restart again the "onReadyHandler" is 
never invoke which blocks the publishing to the stream. Later through the 
help of logs I see that the "onReadyHandler" is waiting to be invoked by 
the main thread itself. Sample program -

@Override
    public void streamNumbers(NumberRequest request, 
StreamObserver<NumberResponse> responseObserver) {
        int count = request.getCount();
        this.serverResponseObserver = 
(ServerCallStreamObserver<NumberResponse>) responseObserver;
        this.onReadyHandler = new OnReadyHandler();
        serverResponseObserver.setOnReadyHandler(onReadyHandler);
        //serverResponseObserver.disableAutoRequest();
        System.out.println("Stream numbers start --> " + 
Thread.currentThread().getName());
        for (int i = 0; i <= count; i++) {
            NumberResponse response = 
NumberResponse.newBuilder().setNumber(i).build();
            if(!serverResponseObserver.isReady()) {
                System.out.println("serverResponseObserver not ready----->" 
+ serverNotReadyCount++ + "current item -->" + i);
                this.onReadyHandler.wasReady = false;
                while(!onReadyHandler.wasReady) {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            serverResponseObserver.onNext(response);
        }
        serverResponseObserver.onCompleted();
    }

    class OnReadyHandler implements Runnable {
        private boolean wasReady = false;

        @Override
        public void run() {
            System.out.println( " OnReadyHandler inside run->" + 
serverResponseObserver.isReady() + wasReady + onReadyHandlerRunCount++ + 
Thread.currentThread().getName());
            if (serverResponseObserver.isReady() && !wasReady) {
                System.out.println( " Response observer is ready and the 
Backpressure is released --> ");
                wasReady = true;
            }
        }
    }

Appreciate if the community could give some input on the implementation or 
provide me with idea to handle the backpressure in the correct way.

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/eeb41336-ba86-4c6c-b4d9-98250865eed0n%40googlegroups.com.

Reply via email to