This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c1fceb23986 Queries now self terminate if in panic mode. (#16380)
c1fceb23986 is described below

commit c1fceb23986f9b6aa460b2f307b72f811065f6c2
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Fri Jul 25 05:41:32 2025 +0530

    Queries now self terminate if in panic mode. (#16380)
    
    * Queries now self terminate if in panic mode.
    
    * Add config test
    
    * Hard kill on critical level.
    
    * Fix configs
    
    * Separate anchor thread interruption.
    
    * Checkstyle
    
    * Review comments
    
    * remove code for critical level
    
    ---------
    
    Co-authored-by: Rajat Venkatesh <[email protected]>
---
 .../accounting/PerQueryCPUMemAccountantFactory.java | 18 ++++++++++++++++++
 .../pinot/core/accounting/QueryMonitorConfig.java   | 21 +++++++++++++++++++++
 .../core/accounting/QueryMonitorConfigTest.java     | 17 +++++++++++++++++
 .../accounting/ThreadResourceUsageAccountant.java   |  9 +++++++++
 .../java/org/apache/pinot/spi/trace/Tracing.java    |  3 ++-
 .../org/apache/pinot/spi/utils/CommonConstants.java |  4 ++++
 6 files changed, 71 insertions(+), 1 deletion(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index e2f23664e39..46e700fe843 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -316,6 +316,16 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       return false;
     }
 
+    @Override
+    public boolean isQueryTerminated() {
+      QueryMonitorConfig config = _watcherTask.getQueryMonitorConfig();
+      if (config.isThreadSelfTerminate() && _watcherTask.getHeapUsageBytes() > 
config.getPanicLevel()) {
+        logSelfTerminatedQuery(_threadLocalEntry.get().getQueryId(), 
Thread.currentThread());
+        return true;
+      }
+      return false;
+    }
+
     @Override
     @Deprecated
     public void createExecutionContext(String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
@@ -581,6 +591,14 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
           queryResourceTracker.getCpuTimeNs(), totalHeapMemoryUsage, 
hasCallback);
     }
 
+    protected void logSelfTerminatedQuery(String queryId, Thread queryThread) {
+      if (!_cancelSentQueries.contains(queryId)) {
+        LOGGER.warn("{} self-terminated. Heap Usage: {}. Query Thread: {}",
+            queryId, _watcherTask.getHeapUsageBytes(), queryThread.getName());
+        _cancelSentQueries.add(queryId);
+      }
+    }
+
     @Override
     public Exception getErrorStatus() {
       return _threadLocalEntry.get()._errorStatus.getAndSet(null);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
index 327b34e4bd2..3efbc432e63 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
@@ -62,6 +62,8 @@ public class QueryMonitorConfig {
 
   private final boolean _isQueryKilledMetricEnabled;
 
+  private final boolean _isThreadSelfTerminate;
+
   public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) {
     _maxHeapSize = maxHeapSize;
 
@@ -106,6 +108,9 @@ public class QueryMonitorConfig {
 
     _isQueryKilledMetricEnabled = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED,
         CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED);
+
+    _isThreadSelfTerminate = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_THREAD_SELF_TERMINATE,
+        CommonConstants.Accounting.DEFAULT_THREAD_SELF_TERMINATE);
   }
 
   QueryMonitorConfig(QueryMonitorConfig oldConfig, Set<String> changedConfigs, 
Map<String, String> clusterConfigs) {
@@ -245,6 +250,18 @@ public class QueryMonitorConfig {
     } else {
       _isQueryKilledMetricEnabled = oldConfig._isQueryKilledMetricEnabled;
     }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_THREAD_SELF_TERMINATE))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          CommonConstants.Accounting.CONFIG_OF_THREAD_SELF_TERMINATE)) {
+        _isThreadSelfTerminate = 
CommonConstants.Accounting.DEFAULT_THREAD_SELF_TERMINATE;
+      } else {
+        _isThreadSelfTerminate =
+            
Boolean.parseBoolean(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_THREAD_SELF_TERMINATE));
+      }
+    } else {
+      _isThreadSelfTerminate = oldConfig._isThreadSelfTerminate;
+    }
   }
 
   public long getMaxHeapSize() {
@@ -294,4 +311,8 @@ public class QueryMonitorConfig {
   public boolean isQueryKilledMetricEnabled() {
     return _isQueryKilledMetricEnabled;
   }
+
+  public boolean isThreadSelfTerminate() {
+    return _isThreadSelfTerminate;
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigTest.java
index d5afcd63d66..79cd17c6e8f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigTest.java
@@ -44,6 +44,7 @@ public class QueryMonitorConfigTest {
   private static final boolean EXPECTED_IS_CPU_TIME_BASED_KILLING_ENABLED = 
true;
   private static final long EXPECTED_CPU_TIME_BASED_KILLING_THRESHOLD_NS = 
1000;
   private static final boolean EXPECTED_IS_QUERY_KILLED_METRIC_ENABLED = true;
+  private static final boolean EXPECTED_IS_THREAD_SELF_TERMINATE_IN_PANIC_MODE 
= true;
   private static final Map<String, String> CLUSTER_CONFIGS = new HashMap<>();
 
   private static String getFullyQualifiedConfigName(String config) {
@@ -79,6 +80,9 @@ public class QueryMonitorConfigTest {
         Double.toString(EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL));
     
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED),
         Boolean.toString(EXPECTED_IS_QUERY_KILLED_METRIC_ENABLED));
+    CLUSTER_CONFIGS.put(
+        
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_THREAD_SELF_TERMINATE),
+        Boolean.toString(EXPECTED_IS_THREAD_SELF_TERMINATE_IN_PANIC_MODE));
   }
 
   @Test
@@ -243,4 +247,17 @@ public class QueryMonitorConfigTest {
             CLUSTER_CONFIGS);
     
assertTrue(accountant.getWatcherTask().getQueryMonitorConfig().isQueryKilledMetricEnabled());
   }
+
+  @Test
+  void testThreadSelfTerminateInPanicMode() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertFalse(accountant.getWatcherTask().getQueryMonitorConfig().isThreadSelfTerminate());
+    accountant.getWatcherTask().onChange(
+        
Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_THREAD_SELF_TERMINATE)),
+        CLUSTER_CONFIGS);
+    
assertTrue(accountant.getWatcherTask().getQueryMonitorConfig().isThreadSelfTerminate());
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index 4fdb4f1e1b6..dc912371980 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -37,6 +37,15 @@ public interface ThreadResourceUsageAccountant {
    */
   boolean isAnchorThreadInterrupted();
 
+  /**
+   * This function is expected to be called by threads in query engine. The 
query id of the task will be available in
+   * the thread execution context stored in a thread local. Therefore it does 
not accept any parameters.
+   * @return true if the query is terminated, false otherwise
+   */
+  default boolean isQueryTerminated() {
+    return false;
+  }
+
   /**
    * This method has been deprecated and replaced by {@link 
setupRunner(String, int, ThreadExecutionContext.TaskType)}
    * and {@link setupWorker(int, ThreadExecutionContext.TaskType, 
ThreadExecutionContext)}.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index afc9a283c1b..831e6d08b6b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -380,7 +380,8 @@ public class Tracing {
     }
 
     public static boolean isInterrupted() {
-      return Thread.interrupted() || 
Tracing.getThreadAccountant().isAnchorThreadInterrupted();
+      return Thread.interrupted() || 
Tracing.getThreadAccountant().isAnchorThreadInterrupted()
+          || Tracing.getThreadAccountant().isQueryTerminated();
     }
 
     public static void sampleAndCheckInterruption() {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 306f8d710ed..4a7676667e5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1548,6 +1548,10 @@ public class CommonConstants {
         "accounting.cancel.callback.cache.expiry.seconds";
     public static final int DEFAULT_CANCEL_CALLBACK_CACHE_EXPIRY_SECONDS = 
1200;
 
+    public static final String CONFIG_OF_THREAD_SELF_TERMINATE =
+        "accounting.thread.self.terminate";
+    public static final boolean DEFAULT_THREAD_SELF_TERMINATE = false;
+
     /**
      * QUERY WORKLOAD ISOLATION Configs
      *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to