Jackie-Jiang commented on code in PR #16339:
URL: https://github.com/apache/pinot/pull/16339#discussion_r2216649285


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java:
##########
@@ -203,9 +277,66 @@ public void throttle(int numMsgs) {
 
     @Override
     public String toString() {
-      return "RateLimiterImpl{"
+      return "PartitionRateLimiter{"
           + "_rate=" + _rate
           + ", _rateLimiter=" + _rateLimiter
+          + ", _quotaUtilizationTracker=" + _quotaUtilizationTracker
+          + '}';
+    }
+  }
+
+  /**
+   * {@code ServerRateLimiter} is an implementation of {@link 
ConsumptionRateLimiter} that uses Guava's
+   * {@link com.google.common.util.concurrent.RateLimiter} to throttle the 
rate of consumption at entire server
+   * level based on a configurable rate limit (in permits per second).
+   * <p>
+   * It supports dynamically updating the rate limit and emits metrics 
asynchronously to track quota utilization
+   * via {@link AsyncMetricEmitter}.
+   *
+   * <p>This class is thread-safe
+   */
+  static class ServerRateLimiter implements ConsumptionRateLimiter {

Review Comment:
   Add `@VisibleForTesting`



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java:
##########
@@ -231,41 +362,87 @@ interface PartitionCountFetcher {
   };
 
   /**
-   * This class is responsible to emit a gauge metric for the ratio of the 
actual consumption rate to the rate limit.
-   * Number of messages consumed are aggregated over one minute. Each minute 
the ratio percentage is calculated and
-   * emitted.
+   * Asynchronously emits the quota utilization metric for a shared rate 
limiter (e.g., server-wide).
+   * <p>
+   * This class aggregates consumed message counts over a fixed time interval 
(default: 60 seconds) using a
+   * high-performance {@link java.util.concurrent.atomic.LongAdder}. A 
scheduled background task computes the
+   * actual message rate and reports the quota utilization ratio as a gauge 
metric.
+   * <p>
+   * This design avoids contention from multiple threads calling emit logic 
and ensures non-blocking accumulation
+   * of message counts. Thread-safe and suitable for shared use.
+   * <p>
+   * Usage:
+   *   - Call {@link #record(int)} to record messages consumed.
+   *   - Call {@link #start()} once to schedule metric emission.
+   *   - Optionally call {@link #close()} to stop the emitter.
+   * <p>
+   * Thread-safe.
    */
-  @VisibleForTesting
-  static class MetricEmitter {
+  static class AsyncMetricEmitter {
+    private static final int METRIC_EMIT_FREQUENCY_SEC = 60;
+    private final AtomicReference<Double> _rateLimit;

Review Comment:
   Use `AtomicDouble`



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java:
##########
@@ -87,24 +92,35 @@ public ConsumptionRateLimiter 
createServerRateLimiter(PinotConfiguration serverC
     double serverRateLimit =
         
serverConfig.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
             CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT);
-    _serverRateLimiter = createServerRateLimiter(serverRateLimit, 
serverMetrics);
+    createServerRateLimiter(serverRateLimit, serverMetrics);
     return _serverRateLimiter;
   }
 
-  private ConsumptionRateLimiter createServerRateLimiter(double 
serverRateLimit, ServerMetrics serverMetrics) {
+  private void createServerRateLimiter(double serverRateLimit, ServerMetrics 
serverMetrics) {

Review Comment:
   Rename to `createOrUpdateServerRateLimiter`



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java:
##########
@@ -231,41 +362,87 @@ interface PartitionCountFetcher {
   };
 
   /**
-   * This class is responsible to emit a gauge metric for the ratio of the 
actual consumption rate to the rate limit.
-   * Number of messages consumed are aggregated over one minute. Each minute 
the ratio percentage is calculated and
-   * emitted.
+   * Asynchronously emits the quota utilization metric for a shared rate 
limiter (e.g., server-wide).
+   * <p>
+   * This class aggregates consumed message counts over a fixed time interval 
(default: 60 seconds) using a
+   * high-performance {@link java.util.concurrent.atomic.LongAdder}. A 
scheduled background task computes the
+   * actual message rate and reports the quota utilization ratio as a gauge 
metric.
+   * <p>
+   * This design avoids contention from multiple threads calling emit logic 
and ensures non-blocking accumulation
+   * of message counts. Thread-safe and suitable for shared use.
+   * <p>
+   * Usage:
+   *   - Call {@link #record(int)} to record messages consumed.
+   *   - Call {@link #start()} once to schedule metric emission.
+   *   - Optionally call {@link #close()} to stop the emitter.
+   * <p>
+   * Thread-safe.
    */
-  @VisibleForTesting
-  static class MetricEmitter {
+  static class AsyncMetricEmitter {
+    private static final int METRIC_EMIT_FREQUENCY_SEC = 60;
+    private final AtomicReference<Double> _rateLimit;
+    private final LongAdder _messageCount = new LongAdder();
+    private final ScheduledExecutorService _executor;
+    private final AtomicBoolean _running = new AtomicBoolean(false);
+    private final QuotaUtilizationTracker _tracker;
 
-    private final ServerMetrics _serverMetrics;
-    private final String _metricKeyName;
+    public AsyncMetricEmitter(ServerMetrics serverMetrics, String 
metricKeyName, double initialRateLimit) {
+      _rateLimit = new AtomicReference<>(initialRateLimit);
+      _tracker = new QuotaUtilizationTracker(serverMetrics, metricKeyName);
+      _executor = Executors.newSingleThreadScheduledExecutor(r -> {
+        Thread t = new Thread(r, "server-rate-limit-metric-emitter");
+        t.setDaemon(true);
+        return t;
+      });
+    }
 
-    // state variables
-    private long _previousMinute = -1;
-    private int _aggregateNumMessages = 0;
+    public void start() {
+      if (_running.compareAndSet(false, true)) {
+        _executor.scheduleAtFixedRate(this::emit, 0, 
METRIC_EMIT_FREQUENCY_SEC, TimeUnit.SECONDS);
+      }
+    }
 
-    public MetricEmitter(ServerMetrics serverMetrics, String metricKeyName) {
-      _serverMetrics = serverMetrics;
-      _metricKeyName = metricKeyName;
+    @VisibleForTesting
+    void start(int initialDelayInSeconds, int emitFrequencyInSeconds) {
+      if (_running.compareAndSet(false, true)) {
+        _executor.scheduleAtFixedRate(this::emit, initialDelayInSeconds, 
emitFrequencyInSeconds, TimeUnit.SECONDS);
+      }
     }
 
-    int emitMetric(int numMsgsConsumed, double rateLimit, Instant now) {
-      int ratioPercentage = 0;
-      long nowInMinutes = now.getEpochSecond() / 60;
-      if (nowInMinutes == _previousMinute) {
-        _aggregateNumMessages += numMsgsConsumed;
-      } else {
-        if (_previousMinute != -1) { // not first time
-          double actualRate = _aggregateNumMessages / ((nowInMinutes - 
_previousMinute) * 60.0); // messages per second
-          ratioPercentage = (int) Math.round(actualRate / rateLimit * 100);
-          _serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.CONSUMPTION_QUOTA_UTILIZATION,
-              ratioPercentage);
-        }
-        _aggregateNumMessages = numMsgsConsumed;
-        _previousMinute = nowInMinutes;
+    public void setRateLimit(double newRateLimit) {
+      _rateLimit.set(newRateLimit);
+    }
+
+    public void record(int numMsgsConsumed) {
+      _messageCount.add(numMsgsConsumed);
+    }
+
+    private void emit() {
+      double rateLimit = _rateLimit.get();

Review Comment:
   Consider adding a try-catch and log warning if it fails to update the rate. 
This can help with debugging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to