This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e9462fee01 Register config listener for ThreadAccountant on broker 
(#17771)
3e9462fee01 is described below

commit 3e9462fee010b0621ea9a90c1fa23c3aaff5b345
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Feb 26 10:51:03 2026 -0800

    Register config listener for ThreadAccountant on broker (#17771)
---
 .../pinot/broker/broker/helix/BaseBrokerStarter.java  | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 40ccf944cea..44105c5d22f 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -99,6 +99,7 @@ import org.apache.pinot.spi.accounting.ThreadAccountant;
 import org.apache.pinot.spi.accounting.ThreadAccountantUtils;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
 import org.apache.pinot.spi.cursors.ResponseStoreService;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import 
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
@@ -139,10 +140,10 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
   private volatile boolean _isShuttingDown = false;
 
   // Dedicated handler for listening to cluster config changes
-  protected final DefaultClusterConfigChangeHandler 
_defaultClusterConfigChangeHandler =
+  protected final DefaultClusterConfigChangeHandler 
_clusterConfigChangeHandler =
       new DefaultClusterConfigChangeHandler();
 
-  // TODO To be removed in favor of _defaultClusterConfigChangeHandler to 
manage config related changes.
+  // TODO To be removed in favor of _clusterConfigChangeHandler to manage 
config related changes.
   //      Please use this only if you are reliant specifically on the 
ClusterChangeMediator infra.
   protected final List<ClusterChangeHandler> _clusterConfigChangeHandlers = 
new ArrayList<>();
   protected final List<ClusterChangeHandler> _idealStateChangeHandlers = new 
ArrayList<>();
@@ -385,6 +386,10 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     _threadAccountant = 
ThreadAccountantUtils.createAccountant(schedulerConfig, _instanceId,
         org.apache.pinot.spi.config.instance.InstanceType.BROKER);
     _threadAccountant.startWatcherTask();
+    PinotClusterConfigChangeListener threadAccountantListener = 
_threadAccountant.getClusterConfigChangeListener();
+    if (threadAccountantListener != null) {
+      
_clusterConfigChangeHandler.registerClusterConfigChangeListener(threadAccountantListener);
+    }
 
     // TODO: Hook multiClusterRoutingContext into request handlers 
subsequently.
     MultiClusterRoutingContext multiClusterRoutingContext = 
getMultiClusterRoutingContext();
@@ -478,7 +483,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     }
 
     LOGGER.info("Wiring up cluster config change handler with helix");
-    
_spectatorHelixManager.addClusterfigChangeListener(_defaultClusterConfigChangeHandler);
+    
_spectatorHelixManager.addClusterfigChangeListener(_clusterConfigChangeHandler);
 
     LOGGER.info("Starting broker admin application on: {}", 
ListenerConfigUtil.toString(_listenerConfigs));
     _brokerAdminApplication = createBrokerAdminApp();
@@ -526,7 +531,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     _brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
       System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
 
-    
_defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
+    
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
 
     NettyInspector.registerMetrics(_brokerMetrics);
 
@@ -825,13 +830,17 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     return _brokerRequestHandler;
   }
 
+  public ThreadAccountant getThreadAccountant() {
+    return _threadAccountant;
+  }
+
   protected BrokerAdminApiApplication createBrokerAdminApp() {
     BrokerAdminApiApplication brokerAdminApiApplication =
         new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler, 
_brokerMetrics, _brokerConf,
             _sqlQueryExecutor, _serverRoutingStatsManager, 
_accessControlFactory, _spectatorHelixManager,
             _queryQuotaManager, _threadAccountant, _responseStore);
     brokerAdminApiApplication.register(
-        new AuditServiceBinder(_defaultClusterConfigChangeHandler, 
getServiceRole(), _brokerMetrics));
+        new AuditServiceBinder(_clusterConfigChangeHandler, getServiceRole(), 
_brokerMetrics));
     registerExtraComponents(brokerAdminApiApplication);
     return brokerAdminApiApplication;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to