This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c1fceb23986 Queries now self terminate if in panic mode. (#16380)
c1fceb23986 is described below
commit c1fceb23986f9b6aa460b2f307b72f811065f6c2
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Fri Jul 25 05:41:32 2025 +0530
Queries now self terminate if in panic mode. (#16380)
* Queries now self terminate if in panic mode.
* Add config test
* Hard kill on critical level.
* Fix configs
* Separate anchor thread interruption.
* Checkstyle
* Review comments
* remove code for critical level
---------
Co-authored-by: Rajat Venkatesh <[email protected]>
---
.../accounting/PerQueryCPUMemAccountantFactory.java | 18 ++++++++++++++++++
.../pinot/core/accounting/QueryMonitorConfig.java | 21 +++++++++++++++++++++
.../core/accounting/QueryMonitorConfigTest.java | 17 +++++++++++++++++
.../accounting/ThreadResourceUsageAccountant.java | 9 +++++++++
.../java/org/apache/pinot/spi/trace/Tracing.java | 3 ++-
.../org/apache/pinot/spi/utils/CommonConstants.java | 4 ++++
6 files changed, 71 insertions(+), 1 deletion(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index e2f23664e39..46e700fe843 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -316,6 +316,16 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
return false;
}
+ @Override
+ public boolean isQueryTerminated() {
+ QueryMonitorConfig config = _watcherTask.getQueryMonitorConfig();
+ if (config.isThreadSelfTerminate() && _watcherTask.getHeapUsageBytes() >
config.getPanicLevel()) {
+ logSelfTerminatedQuery(_threadLocalEntry.get().getQueryId(),
Thread.currentThread());
+ return true;
+ }
+ return false;
+ }
+
@Override
@Deprecated
public void createExecutionContext(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
@@ -581,6 +591,14 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
queryResourceTracker.getCpuTimeNs(), totalHeapMemoryUsage,
hasCallback);
}
+ protected void logSelfTerminatedQuery(String queryId, Thread queryThread) {
+ if (!_cancelSentQueries.contains(queryId)) {
+ LOGGER.warn("{} self-terminated. Heap Usage: {}. Query Thread: {}",
+ queryId, _watcherTask.getHeapUsageBytes(), queryThread.getName());
+ _cancelSentQueries.add(queryId);
+ }
+ }
+
@Override
public Exception getErrorStatus() {
return _threadLocalEntry.get()._errorStatus.getAndSet(null);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
index 327b34e4bd2..3efbc432e63 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
@@ -62,6 +62,8 @@ public class QueryMonitorConfig {
private final boolean _isQueryKilledMetricEnabled;
+ private final boolean _isThreadSelfTerminate;
+
public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) {
_maxHeapSize = maxHeapSize;
@@ -106,6 +108,9 @@ public class QueryMonitorConfig {
_isQueryKilledMetricEnabled =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED,
CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED);
+
+ _isThreadSelfTerminate =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_THREAD_SELF_TERMINATE,
+ CommonConstants.Accounting.DEFAULT_THREAD_SELF_TERMINATE);
}
QueryMonitorConfig(QueryMonitorConfig oldConfig, Set<String> changedConfigs,
Map<String, String> clusterConfigs) {
@@ -245,6 +250,18 @@ public class QueryMonitorConfig {
} else {
_isQueryKilledMetricEnabled = oldConfig._isQueryKilledMetricEnabled;
}
+
+ if
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_THREAD_SELF_TERMINATE))
{
+ if (clusterConfigs == null || !clusterConfigs.containsKey(
+ CommonConstants.Accounting.CONFIG_OF_THREAD_SELF_TERMINATE)) {
+ _isThreadSelfTerminate =
CommonConstants.Accounting.DEFAULT_THREAD_SELF_TERMINATE;
+ } else {
+ _isThreadSelfTerminate =
+
Boolean.parseBoolean(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_THREAD_SELF_TERMINATE));
+ }
+ } else {
+ _isThreadSelfTerminate = oldConfig._isThreadSelfTerminate;
+ }
}
public long getMaxHeapSize() {
@@ -294,4 +311,8 @@ public class QueryMonitorConfig {
public boolean isQueryKilledMetricEnabled() {
return _isQueryKilledMetricEnabled;
}
+
+ public boolean isThreadSelfTerminate() {
+ return _isThreadSelfTerminate;
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigTest.java
index d5afcd63d66..79cd17c6e8f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigTest.java
@@ -44,6 +44,7 @@ public class QueryMonitorConfigTest {
private static final boolean EXPECTED_IS_CPU_TIME_BASED_KILLING_ENABLED =
true;
private static final long EXPECTED_CPU_TIME_BASED_KILLING_THRESHOLD_NS =
1000;
private static final boolean EXPECTED_IS_QUERY_KILLED_METRIC_ENABLED = true;
+ private static final boolean EXPECTED_IS_THREAD_SELF_TERMINATE_IN_PANIC_MODE
= true;
private static final Map<String, String> CLUSTER_CONFIGS = new HashMap<>();
private static String getFullyQualifiedConfigName(String config) {
@@ -79,6 +80,9 @@ public class QueryMonitorConfigTest {
Double.toString(EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL));
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED),
Boolean.toString(EXPECTED_IS_QUERY_KILLED_METRIC_ENABLED));
+ CLUSTER_CONFIGS.put(
+
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_THREAD_SELF_TERMINATE),
+ Boolean.toString(EXPECTED_IS_THREAD_SELF_TERMINATE_IN_PANIC_MODE));
}
@Test
@@ -243,4 +247,17 @@ public class QueryMonitorConfigTest {
CLUSTER_CONFIGS);
assertTrue(accountant.getWatcherTask().getQueryMonitorConfig().isQueryKilledMetricEnabled());
}
+
+ @Test
+ void testThreadSelfTerminateInPanicMode() {
+ PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant
accountant =
+ new
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new
PinotConfiguration(), "test",
+ InstanceType.SERVER);
+
+
assertFalse(accountant.getWatcherTask().getQueryMonitorConfig().isThreadSelfTerminate());
+ accountant.getWatcherTask().onChange(
+
Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_THREAD_SELF_TERMINATE)),
+ CLUSTER_CONFIGS);
+
assertTrue(accountant.getWatcherTask().getQueryMonitorConfig().isThreadSelfTerminate());
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index 4fdb4f1e1b6..dc912371980 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -37,6 +37,15 @@ public interface ThreadResourceUsageAccountant {
*/
boolean isAnchorThreadInterrupted();
+ /**
+ * This function is expected to be called by threads in query engine. The
query id of the task will be available in
+ * the thread execution context stored in a thread local. Therefore it does
not accept any parameters.
+ * @return true if the query is terminated, false otherwise
+ */
+ default boolean isQueryTerminated() {
+ return false;
+ }
+
/**
* This method has been deprecated and replaced by {@link
setupRunner(String, int, ThreadExecutionContext.TaskType)}
* and {@link setupWorker(int, ThreadExecutionContext.TaskType,
ThreadExecutionContext)}.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index afc9a283c1b..831e6d08b6b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -380,7 +380,8 @@ public class Tracing {
}
public static boolean isInterrupted() {
- return Thread.interrupted() ||
Tracing.getThreadAccountant().isAnchorThreadInterrupted();
+ return Thread.interrupted() ||
Tracing.getThreadAccountant().isAnchorThreadInterrupted()
+ || Tracing.getThreadAccountant().isQueryTerminated();
}
public static void sampleAndCheckInterruption() {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 306f8d710ed..4a7676667e5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1548,6 +1548,10 @@ public class CommonConstants {
"accounting.cancel.callback.cache.expiry.seconds";
public static final int DEFAULT_CANCEL_CALLBACK_CACHE_EXPIRY_SECONDS =
1200;
+ public static final String CONFIG_OF_THREAD_SELF_TERMINATE =
+ "accounting.thread.self.terminate";
+ public static final boolean DEFAULT_THREAD_SELF_TERMINATE = false;
+
/**
* QUERY WORKLOAD ISOLATION Configs
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]