Yes the server method callback and onReady() are both handled by the same
executor. So you can not block on the method while waiting for the onReady
handler to be executed. Try something like this on your callback:
AtomicInteger sentMessage = new AtomicInteger(0);
final AtomicBoolean isClosed = new AtomicBoolean(false);
Runnable onReady = () -> {
while (((ServerCallStreamObserver<StreamingOutputCallResponse>)
responseObserver).isReady() && sentMessage.get() < 100) {
responseObserver.onNext(StreamingOutputCallResponse.newBuilder().build());
sentMessage.incrementAndGet();
}
if (sentMessage.get() >= 100 && !isClosed.get()) {
responseObserver.onCompleted();
isClosed.set(true);
}
};
((ServerCallStreamObserver)responseObserver).setOnReadyHandler(onReady);
onReady.run();
On Thursday, November 16, 2023 at 2:44:03 PM UTC-8 Vivek Shindhe wrote:
> In a server-side streaming scenario, we publish to the
> "ServerCallStreamObserver" a high number of items to the "onNext()" call.
> As recommended we need to handle the back pressure and check the readiness
> of the stream before sending the message. Hence using the method
> ServerCallStreamObserver's "isReady()". If not ready we will put the thread
> to sleep until the ServerCallStreamObserver is ready again.
>
> To do so we rely on setOnReadyHandler(Runnable onReadyHandler) which is
> supposed to be invoked when the ServerCallStreamObserver's isReady()
> transitions from false to true. But this leads to a thread block situation,
> while the main thread is waiting to restart again the "onReadyHandler" is
> never invoke which blocks the publishing to the stream. Later through the
> help of logs I see that the "onReadyHandler" is waiting to be invoked by
> the main thread itself. Sample program -
>
> @Override
> public void streamNumbers(NumberRequest request,
> StreamObserver<NumberResponse> responseObserver) {
> int count = request.getCount();
> this.serverResponseObserver =
> (ServerCallStreamObserver<NumberResponse>) responseObserver;
> this.onReadyHandler = new OnReadyHandler();
> serverResponseObserver.setOnReadyHandler(onReadyHandler);
> //serverResponseObserver.disableAutoRequest();
> System.out.println("Stream numbers start --> " +
> Thread.currentThread().getName());
> for (int i = 0; i <= count; i++) {
> NumberResponse response =
> NumberResponse.newBuilder().setNumber(i).build();
> if(!serverResponseObserver.isReady()) {
> System.out.println("serverResponseObserver not
> ready----->" + serverNotReadyCount++ + "current item -->" + i);
> this.onReadyHandler.wasReady = false;
> while(!onReadyHandler.wasReady) {
> try {
> Thread.sleep(1);
> } catch (InterruptedException e) {
> throw new RuntimeException(e);
> }
> }
> }
> serverResponseObserver.onNext(response);
> }
> serverResponseObserver.onCompleted();
> }
>
> class OnReadyHandler implements Runnable {
> private boolean wasReady = false;
>
> @Override
> public void run() {
> System.out.println( " OnReadyHandler inside run->" +
> serverResponseObserver.isReady() + wasReady + onReadyHandlerRunCount++ +
> Thread.currentThread().getName());
> if (serverResponseObserver.isReady() && !wasReady) {
> System.out.println( " Response observer is ready and the
> Backpressure is released --> ");
> wasReady = true;
> }
> }
> }
>
> Appreciate if the community could give some input on the implementation or
> provide me with idea to handle the backpressure in the correct way.
>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/c3735e8b-eecb-46a8-8f37-580d0f3a39dbn%40googlegroups.com.