poorbarcode commented on code in PR #24423:
URL: https://github.com/apache/pulsar/pull/24423#discussion_r2203212100


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java:
##########
@@ -106,6 +107,36 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
 
     private final BaseCommand cmd = new BaseCommand();
 
+    protected int maxPendingWriteBytes = -1;
+    protected boolean pauseAutoReadDueToChannelWriteBufFull;
+
+    private void pauseAutoReadIfChannelWriteBufIsFull(ChannelHandlerContext 
ctx) {
+        if (maxPendingWriteBytes < 0) {
+            return;
+        }
+        final ChannelOutboundBuffer outboundBuffer = 
ctx.channel().unsafe().outboundBuffer();
+        final BaseCommand.Type cmdType = cmd.getType();
+        if (!ctx.channel().isWritable() && cmdType != BaseCommand.Type.PONG
+                && outboundBuffer != null && 
outboundBuffer.totalPendingWriteBytes() > maxPendingWriteBytes) {
+            log.warn("[{}] is not writable, disable channel auto-read, pending 
output bytes: {}",
+                    this, outboundBuffer.totalPendingWriteBytes());
+            pauseAutoReadDueToChannelWriteBufFull = true;
+            ctx.channel().config().setAutoRead(false);
+        }
+    }
+
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
+        // To avoid the effect of producer limitation, which also will disable 
channel auto-read, we check
+        // "pauseAutoReadDueToChannelWriteBufFull" here.
+        if (ctx.channel().isWritable() && 
pauseAutoReadDueToChannelWriteBufFull) {
+            log.warn("[{}] is not writable, turn on channel auto-read", this);
+            pauseAutoReadDueToChannelWriteBufFull = false;
+            ctx.channel().config().setAutoRead(true);
+        }
+        ctx.fireChannelWritabilityChanged();
+    }

Review Comment:
   Answered in mail list 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to