lhotari commented on code in PR #24423:
URL: https://github.com/apache/pulsar/pull/24423#discussion_r2194776340
##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java:
##########
@@ -106,6 +107,36 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
private final BaseCommand cmd = new BaseCommand();
+ protected int maxPendingWriteBytes = -1;
+ protected boolean pauseAutoReadDueToChannelWriteBufFull;
+
+ private void pauseAutoReadIfChannelWriteBufIsFull(ChannelHandlerContext
ctx) {
+ if (maxPendingWriteBytes < 0) {
+ return;
+ }
+ final ChannelOutboundBuffer outboundBuffer =
ctx.channel().unsafe().outboundBuffer();
+ final BaseCommand.Type cmdType = cmd.getType();
+ if (!ctx.channel().isWritable() && cmdType != BaseCommand.Type.PONG
+ && outboundBuffer != null &&
outboundBuffer.totalPendingWriteBytes() > maxPendingWriteBytes) {
+ log.warn("[{}] is not writable, disable channel auto-read, pending
output bytes: {}",
+ this, outboundBuffer.totalPendingWriteBytes());
+ pauseAutoReadDueToChannelWriteBufFull = true;
+ ctx.channel().config().setAutoRead(false);
+ }
+ }
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
Exception {
+ // To avoid the effect of producer limitation, which also will disable
channel auto-read, we check
+ // "pauseAutoReadDueToChannelWriteBufFull" here.
+ if (ctx.channel().isWritable() &&
pauseAutoReadDueToChannelWriteBufFull) {
+ log.warn("[{}] is not writable, turn on channel auto-read", this);
+ pauseAutoReadDueToChannelWriteBufFull = false;
+ ctx.channel().config().setAutoRead(true);
+ }
+ ctx.fireChannelWritabilityChanged();
+ }
Review Comment:
This solution uses an incorrect way for
backpressuring based on outboundBuffer's totalPendingWriteBytes.
Instead of having a new setting `connectionMaxPendingWriteBytes` the
correct way would be to expose and configure WriteBufferWaterMark high
and low settings for the child channel
(`ChannelOption.WRITE_BUFFER_WATER_MARK`) and rely on
`channelWritabilityChanged` and use `ServerCnxThrottleTracker`'s
`incrementThrottleCount`/`decrementThrottleCount` there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]