This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch hotfix-zk-watcher-2
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/hotfix-zk-watcher-2 by this
push:
new 4fa09f6163 Reduce ZK access for SendStatsPredicate (#15895) (#15917)
4fa09f6163 is described below
commit 4fa09f6163370eda56dea6d9bc014ca290b72e48
Author: Praveen <[email protected]>
AuthorDate: Tue May 27 15:14:25 2025 -0700
Reduce ZK access for SendStatsPredicate (#15895) (#15917)
Co-authored-by: Xiaotian (Jackie) Jiang
<[email protected]>
---
.../server/starter/helix/BaseServerStarter.java | 14 +-
.../server/starter/helix/SendStatsPredicate.java | 141 ++++++++++++++++-----
.../pinot/server/worker/WorkerQueryServer.java | 2 +-
3 files changed, 117 insertions(+), 40 deletions(-)
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 5704d44875..689ace66f7 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -672,7 +672,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler,
segmentStarTreePreprocessThrottler,
segmentDownloadThrottler);
- SendStatsPredicate sendStatsPredicate =
SendStatsPredicate.create(_serverConf);
+ SendStatsPredicate sendStatsPredicate =
SendStatsPredicate.create(_serverConf, _helixManager);
ServerConf serverConf = new ServerConf(_serverConf);
_serverInstance = new ServerInstance(serverConf, _helixManager,
_accessControlFactory, _segmentOperationsThrottler,
sendStatsPredicate);
@@ -706,11 +706,13 @@ public abstract class BaseServerStarter implements
ServiceStartable {
}
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_segmentOperationsThrottler);
- LOGGER.info("Initializing and registering the SendStatsPredicate");
- try {
- _helixManager.addInstanceConfigChangeListener(sendStatsPredicate);
- } catch (Exception e) {
- LOGGER.error("Failed to register SendStatsPredicate as the Helix
InstanceConfigChangeListener", e);
+ if (sendStatsPredicate.needWatchForInstanceConfigChange()) {
+ LOGGER.info("Initializing and registering the SendStatsPredicate");
+ try {
+ _helixManager.addInstanceConfigChangeListener(sendStatsPredicate);
+ } catch (Exception e) {
+ LOGGER.error("Failed to register SendStatsPredicate as the Helix
InstanceConfigChangeListener", e);
+ }
}
// Start restlet server for admin API endpoint
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
index 833f7d4504..4678cab2dd 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
@@ -22,12 +22,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.BatchMode;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.version.PinotVersion;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
@@ -54,10 +58,13 @@ import org.slf4j.LoggerFactory;
public abstract class SendStatsPredicate implements
InstanceConfigChangeListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(SendStatsPredicate.class);
- public abstract boolean getSendStats();
+ public abstract boolean isSendStats();
- public static SendStatsPredicate create(PinotConfiguration configuration) {
- String modeStr = configuration.getProperty(
+ public abstract boolean needWatchForInstanceConfigChange();
+
+ // NOTE: When this method is called, the helix manager is not yet connected.
+ public static SendStatsPredicate create(PinotConfiguration serverConf,
HelixManager helixManager) {
+ String modeStr = serverConf.getProperty(
CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE,
CommonConstants.MultiStageQueryRunner.DEFAULT_SEND_STATS_MODE).toUpperCase(Locale.ENGLISH);
Mode mode;
@@ -67,87 +74,155 @@ public abstract class SendStatsPredicate implements
InstanceConfigChangeListener
throw new IllegalArgumentException("Invalid value " + modeStr + " for "
+ CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE, e);
}
- return mode.create();
+ return mode.create(helixManager);
}
public enum Mode {
SAFE {
@Override
- public SendStatsPredicate create() {
- return new Safe();
+ public SendStatsPredicate create(HelixManager helixManager) {
+ return new Safe(helixManager);
}
},
ALWAYS {
@Override
- public SendStatsPredicate create() {
+ public SendStatsPredicate create(HelixManager helixManager) {
return new SendStatsPredicate() {
@Override
- public boolean getSendStats() {
+ public boolean isSendStats() {
return true;
}
+ @Override
+ public boolean needWatchForInstanceConfigChange() {
+ return false;
+ }
+
@Override
public void onInstanceConfigChange(List<InstanceConfig>
instanceConfigs, NotificationContext context) {
- // Nothing to do
+ throw new UnsupportedOperationException("Should not be invoked");
}
};
}
},
NEVER {
@Override
- public SendStatsPredicate create() {
+ public SendStatsPredicate create(HelixManager helixManager) {
return new SendStatsPredicate() {
@Override
- public boolean getSendStats() {
+ public boolean isSendStats() {
+ return false;
+ }
+
+ @Override
+ public boolean needWatchForInstanceConfigChange() {
return false;
}
@Override
public void onInstanceConfigChange(List<InstanceConfig>
instanceConfigs, NotificationContext context) {
- // Nothing to do
+ throw new UnsupportedOperationException("Should not be invoked");
}
};
}
};
- public abstract SendStatsPredicate create();
+ public abstract SendStatsPredicate create(HelixManager helixManager);
}
+ @BatchMode(enabled = false)
+ @PreFetch(enabled = false)
private static class Safe extends SendStatsPredicate {
- private final AtomicBoolean _sendStats = new AtomicBoolean(true);
+ private final HelixManager _helixManager;
+ private final String _clusterName;
+ private final Map<String, String> _problematicVersionsById = new
HashMap<>();
+
+ private HelixAdmin _helixAdmin;
+ private volatile boolean _sendStats = true;
+
+ public Safe(HelixManager helixManager) {
+ _helixManager = helixManager;
+ _clusterName = helixManager.getClusterName();
+ }
+
+ @Override
+ public boolean isSendStats() {
+ return _sendStats;
+ }
@Override
- public boolean getSendStats() {
- return _sendStats.get();
+ public boolean needWatchForInstanceConfigChange() {
+ return true;
}
@Override
- public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
NotificationContext context) {
- Map<String, String> problematicVersionsById = new HashMap<>();
- for (InstanceConfig instanceConfig : instanceConfigs) {
- switch
(InstanceTypeUtils.getInstanceType(instanceConfig.getInstanceName())) {
- case BROKER:
- case SERVER:
- String otherVersion = instanceConfig.getRecord()
-
.getStringField(CommonConstants.Helix.Instance.PINOT_VERSION_KEY, null);
- if (isProblematicVersion(otherVersion)) {
- problematicVersionsById.put(instanceConfig.getInstanceName(),
otherVersion);
+ public synchronized void onInstanceConfigChange(List<InstanceConfig>
instanceConfigs, NotificationContext context) {
+ if (_helixAdmin == null) {
+ _helixAdmin = _helixManager.getClusterManagmentTool();
+ }
+ NotificationContext.Type type = context.getType();
+ if (type != NotificationContext.Type.INIT && type !=
NotificationContext.Type.CALLBACK) {
+ LOGGER.warn("Ignoring notification type: {} for instance config
change", type);
+ return;
+ }
+ if (type == NotificationContext.Type.INIT || context.getIsChildChange())
{
+ _problematicVersionsById.clear();
+ for (String instance :
_helixAdmin.getInstancesInCluster(_clusterName)) {
+ if (needVersionCheck(instance)) {
+ InstanceConfig instanceConfig;
+ try {
+ instanceConfig = _helixAdmin.getInstanceConfig(_clusterName,
instance);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get instance config for instance: {},
continue", instance, e);
+ continue;
}
- break;
- default:
- continue;
+ String version = getVersion(instanceConfig);
+ if (isProblematicVersion(version)) {
+ _problematicVersionsById.put(instance, version);
+ }
+ }
+ }
+ } else {
+ String pathChanged = context.getPathChanged();
+ String instanceName =
pathChanged.substring(pathChanged.lastIndexOf('/') + 1);
+ if (needVersionCheck(instanceName)) {
+ InstanceConfig instanceConfig;
+ try {
+ instanceConfig = _helixAdmin.getInstanceConfig(_clusterName,
instanceName);
+ String version = getVersion(instanceConfig);
+ if (isProblematicVersion(version)) {
+ _problematicVersionsById.put(instanceName, version);
+ } else {
+ _problematicVersionsById.remove(instanceName);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get instance config for instance: {},
treating it as non-problematic", instanceName,
+ e);
+ _problematicVersionsById.remove(instanceName);
+ }
}
}
- boolean sendStats = problematicVersionsById.isEmpty();
- if (_sendStats.getAndSet(sendStats) != sendStats) {
+ boolean sendStats = _problematicVersionsById.isEmpty();
+ if (_sendStats != sendStats) {
+ _sendStats = sendStats;
if (sendStats) {
LOGGER.warn("Send MSE stats is now enabled");
} else {
- LOGGER.warn("Send MSE stats is now disabled (problematic versions:
{})", problematicVersionsById);
+ LOGGER.warn("Send MSE stats is now disabled (problematic versions:
{})", _problematicVersionsById);
}
}
}
+ private boolean needVersionCheck(String instanceName) {
+ InstanceType instanceType =
InstanceTypeUtils.getInstanceType(instanceName);
+ return instanceType == InstanceType.BROKER || instanceType ==
InstanceType.SERVER;
+ }
+
+ @Nullable
+ private String getVersion(InstanceConfig instanceConfig) {
+ return
instanceConfig.getRecord().getStringField(CommonConstants.Helix.Instance.PINOT_VERSION_KEY,
null);
+ }
+
/// Returns true if the version is problematic
///
/// Ideally [PinotVersion] should have a way to extract versions in
comparable format, but given it doesn't we
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index 9fa5af663d..9a2e976064 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -41,7 +41,7 @@ public class WorkerQueryServer {
_queryServicePort =
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
QueryRunner queryRunner = new QueryRunner();
- queryRunner.init(_configuration, instanceDataManager, tlsConfig,
sendStats::getSendStats);
+ queryRunner.init(_configuration, instanceDataManager, tlsConfig,
sendStats::isSendStats);
_queryWorkerService = new QueryServer(_queryServicePort, queryRunner,
tlsConfig, configuration);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]