lhotari commented on code in PR #24423:
URL: https://github.com/apache/pulsar/pull/24423#discussion_r2194776340


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java:
##########
@@ -106,6 +107,36 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
 
     private final BaseCommand cmd = new BaseCommand();
 
+    protected int maxPendingWriteBytes = -1;
+    protected boolean pauseAutoReadDueToChannelWriteBufFull;
+
+    private void pauseAutoReadIfChannelWriteBufIsFull(ChannelHandlerContext 
ctx) {
+        if (maxPendingWriteBytes < 0) {
+            return;
+        }
+        final ChannelOutboundBuffer outboundBuffer = 
ctx.channel().unsafe().outboundBuffer();
+        final BaseCommand.Type cmdType = cmd.getType();
+        if (!ctx.channel().isWritable() && cmdType != BaseCommand.Type.PONG
+                && outboundBuffer != null && 
outboundBuffer.totalPendingWriteBytes() > maxPendingWriteBytes) {
+            log.warn("[{}] is not writable, disable channel auto-read, pending 
output bytes: {}",
+                    this, outboundBuffer.totalPendingWriteBytes());
+            pauseAutoReadDueToChannelWriteBufFull = true;
+            ctx.channel().config().setAutoRead(false);
+        }
+    }
+
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
+        // To avoid the effect of producer limitation, which also will disable 
channel auto-read, we check
+        // "pauseAutoReadDueToChannelWriteBufFull" here.
+        if (ctx.channel().isWritable() && 
pauseAutoReadDueToChannelWriteBufFull) {
+            log.warn("[{}] is not writable, turn on channel auto-read", this);
+            pauseAutoReadDueToChannelWriteBufFull = false;
+            ctx.channel().config().setAutoRead(true);
+        }
+        ctx.fireChannelWritabilityChanged();
+    }

Review Comment:
   This solution uses an incorrect way for
   backpressuring based on outboundBuffer's totalPendingWriteBytes.
   Instead of having a new setting `connectionMaxPendingWriteBytes` the
   correct way would be to expose and configure WriteBufferWaterMark high
   and low settings for the child channel
   (`ChannelOption.WRITE_BUFFER_WATER_MARK`) and rely on
   `channelWritabilityChanged` and use `ServerCnxThrottleTracker`'s
   `incrementThrottleCount`/`decrementThrottleCount` there.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to