lhotari commented on code in PR #24423:
URL: https://github.com/apache/pulsar/pull/24423#discussion_r2266509480
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -437,11 +449,37 @@ public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
}
}
+ protected void checkRateLimit(BaseCommand cmd) {
+ if (cmd.getType() == BaseCommand.Type.PONG && cmd.getType() ==
BaseCommand.Type.PING) {
+ return;
+ }
+ if (requestRateLimiter.acquire(1) == 0) {
+ log.warn("[{}] Reached rate limitation", this);
+ // Stop receiving requests.
+ pausedDueToRateLimitation = true;
+ ctx.channel().config().setAutoRead(false);
+ // Resume after 1 second.
+ ctx.channel().eventLoop().schedule(() -> {
+ if (pausedDueToRateLimitation) {
+ log.info("[{}] Resuming connection after rate limitation",
this);
+ ctx.channel().config().setAutoRead(true);
+ pausedDueToRateLimitation = false;
+ }
+ }, 1, TimeUnit.SECONDS);
+ }
+ }
+
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
Exception {
- if (log.isDebugEnabled()) {
- log.debug("Channel writability has changed to: {}",
ctx.channel().isWritable());
+ if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) {
+ log.info("[{}] is writable, turn on channel auto-read", this);
+ ctx.channel().config().setAutoRead(true);
+
requestRateLimiter.timingOpen(rateLimitingSecondsAfterResumeFromUnreadable,
TimeUnit.SECONDS);
+ } else if (pauseReceivingRequestsIfUnwritable &&
!ctx.channel().isWritable()) {
+ log.info("[{}] is not writable, turn off channel auto-read", this);
+ ctx.channel().config().setAutoRead(false);
Review Comment:
This PR is targeting master branch. You should use
`ServerCnxThrottleTracker` in master branch PRs. A separate backport can be
made for Pulsar 3.0 if this PIP is also for Pulsar 3.0.x.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -437,11 +449,37 @@ public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
}
}
+ protected void checkRateLimit(BaseCommand cmd) {
+ if (cmd.getType() == BaseCommand.Type.PONG && cmd.getType() ==
BaseCommand.Type.PING) {
+ return;
+ }
+ if (requestRateLimiter.acquire(1) == 0) {
+ log.warn("[{}] Reached rate limitation", this);
+ // Stop receiving requests.
+ pausedDueToRateLimitation = true;
+ ctx.channel().config().setAutoRead(false);
+ // Resume after 1 second.
+ ctx.channel().eventLoop().schedule(() -> {
+ if (pausedDueToRateLimitation) {
+ log.info("[{}] Resuming connection after rate limitation",
this);
+ ctx.channel().config().setAutoRead(true);
+ pausedDueToRateLimitation = false;
+ }
+ }, 1, TimeUnit.SECONDS);
+ }
Review Comment:
This PR is targeting master branch. You should use
`ServerCnxThrottleTracker` in master branch PRs. A separate backport can be
made for Pulsar 3.0 if this PIP is also for Pulsar 3.0.x.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]