This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3e9462fee01 Register config listener for ThreadAccountant on broker
(#17771)
3e9462fee01 is described below
commit 3e9462fee010b0621ea9a90c1fa23c3aaff5b345
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Feb 26 10:51:03 2026 -0800
Register config listener for ThreadAccountant on broker (#17771)
---
.../pinot/broker/broker/helix/BaseBrokerStarter.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 40ccf944cea..44105c5d22f 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -99,6 +99,7 @@ import org.apache.pinot.spi.accounting.ThreadAccountant;
import org.apache.pinot.spi.accounting.ThreadAccountantUtils;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
import org.apache.pinot.spi.cursors.ResponseStoreService;
import org.apache.pinot.spi.env.PinotConfiguration;
import
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
@@ -139,10 +140,10 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
private volatile boolean _isShuttingDown = false;
// Dedicated handler for listening to cluster config changes
- protected final DefaultClusterConfigChangeHandler
_defaultClusterConfigChangeHandler =
+ protected final DefaultClusterConfigChangeHandler
_clusterConfigChangeHandler =
new DefaultClusterConfigChangeHandler();
- // TODO To be removed in favor of _defaultClusterConfigChangeHandler to
manage config related changes.
+ // TODO To be removed in favor of _clusterConfigChangeHandler to manage
config related changes.
// Please use this only if you are reliant specifically on the
ClusterChangeMediator infra.
protected final List<ClusterChangeHandler> _clusterConfigChangeHandlers =
new ArrayList<>();
protected final List<ClusterChangeHandler> _idealStateChangeHandlers = new
ArrayList<>();
@@ -385,6 +386,10 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
_threadAccountant =
ThreadAccountantUtils.createAccountant(schedulerConfig, _instanceId,
org.apache.pinot.spi.config.instance.InstanceType.BROKER);
_threadAccountant.startWatcherTask();
+ PinotClusterConfigChangeListener threadAccountantListener =
_threadAccountant.getClusterConfigChangeListener();
+ if (threadAccountantListener != null) {
+
_clusterConfigChangeHandler.registerClusterConfigChangeListener(threadAccountantListener);
+ }
// TODO: Hook multiClusterRoutingContext into request handlers
subsequently.
MultiClusterRoutingContext multiClusterRoutingContext =
getMultiClusterRoutingContext();
@@ -478,7 +483,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
}
LOGGER.info("Wiring up cluster config change handler with helix");
-
_spectatorHelixManager.addClusterfigChangeListener(_defaultClusterConfigChangeHandler);
+
_spectatorHelixManager.addClusterfigChangeListener(_clusterConfigChangeHandler);
LOGGER.info("Starting broker admin application on: {}",
ListenerConfigUtil.toString(_listenerConfigs));
_brokerAdminApplication = createBrokerAdminApp();
@@ -526,7 +531,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
_brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
-
_defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
+
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
NettyInspector.registerMetrics(_brokerMetrics);
@@ -825,13 +830,17 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
return _brokerRequestHandler;
}
+ public ThreadAccountant getThreadAccountant() {
+ return _threadAccountant;
+ }
+
protected BrokerAdminApiApplication createBrokerAdminApp() {
BrokerAdminApiApplication brokerAdminApiApplication =
new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler,
_brokerMetrics, _brokerConf,
_sqlQueryExecutor, _serverRoutingStatsManager,
_accessControlFactory, _spectatorHelixManager,
_queryQuotaManager, _threadAccountant, _responseStore);
brokerAdminApiApplication.register(
- new AuditServiceBinder(_defaultClusterConfigChangeHandler,
getServiceRole(), _brokerMetrics));
+ new AuditServiceBinder(_clusterConfigChangeHandler, getServiceRole(),
_brokerMetrics));
registerExtraComponents(brokerAdminApiApplication);
return brokerAdminApiApplication;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]