Copilot commented on code in PR #16378:
URL: https://github.com/apache/pinot/pull/16378#discussion_r2214691547


##########
pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java:
##########
@@ -265,4 +268,180 @@ void testQueryAggregationAddNewQueryTask() {
     threadLatch.countDown();
     newQueryThreadLatch.countDown();
   }
+
+  @Test
+  void testPerQueryMemoryCheckDisabled() {
+    // Test when per-query memory check is disabled
+    Map<String, Object> configs = new HashMap<>();
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK, 
false);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, 
true);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES, 
1000L);
+
+    PinotConfiguration config = new PinotConfiguration(configs);
+    TestMemoryCheckAccountant accountant = new 
TestMemoryCheckAccountant(config, false, 2000L);
+
+    // Should not interrupt when feature is disabled
+    accountant.checkMemoryAndInterruptIfExceeded();
+
+    // Verify no error was set
+    assertEquals(accountant.getErrorStatus(), null);
+    assertTrue(accountant._anchorThread.isAlive());
+  }
+
+  @Test
+  void testPerQueryMemoryCheckUnderLimit() {
+    // Test when memory usage is under the limit
+    Map<String, Object> configs = new HashMap<>();
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK, 
true);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, 
true);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES, 
3000L);
+
+    PinotConfiguration config = new PinotConfiguration(configs);
+    TestMemoryCheckAccountant accountant = new 
TestMemoryCheckAccountant(config, true, 2000L);
+
+    // Should not interrupt when under limit
+    accountant.checkMemoryAndInterruptIfExceeded();
+
+    // Verify no error was set
+    assertEquals(accountant.getErrorStatus(), null);
+    assertTrue(accountant._anchorThread.isAlive());
+  }
+
+  @Test
+  void testPerQueryMemoryCheckOverLimit() {
+    // Test when memory usage exceeds the limit
+    Map<String, Object> configs = new HashMap<>();
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK, 
true);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, 
true);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES, 
1000L);
+
+    PinotConfiguration config = new PinotConfiguration(configs);
+    TestMemoryCheckAccountant accountant = new 
TestMemoryCheckAccountant(config, true, 2000L);
+
+    // Should interrupt when over limit
+    accountant.checkMemoryAndInterruptIfExceeded();
+
+    // Verify error was set with appropriate message
+    Exception error = accountant.getTestErrorStatus();
+    assertNotNull(error);
+    assertTrue(error.getMessage().contains("exceeded per-query memory limit"));
+    assertTrue(error.getMessage().contains("testQuery"));
+    assertTrue(error.getMessage().contains("2000 bytes"));
+    assertTrue(error.getMessage().contains("1000 bytes"));
+
+    // Verify anchor thread interruption was attempted (the main functionality 
is the error message)
+    // Note: Thread.isInterrupted() might not immediately return true due to 
timing issues
+  }
+
+  @Test
+  void testPerQueryMemoryCheckMemorySamplingDisabled() {
+    // Test when memory sampling is disabled
+    Map<String, Object> configs = new HashMap<>();
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK, 
true);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, 
false);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES, 
1000L);
+
+    PinotConfiguration config = new PinotConfiguration(configs);
+    TestMemoryCheckAccountant accountant = new 
TestMemoryCheckAccountant(config, false, 2000L);
+
+    // Should not interrupt when memory sampling is disabled
+    accountant.checkMemoryAndInterruptIfExceeded();
+
+    // Verify no error was set
+    assertEquals(accountant.getErrorStatus(), null);
+    assertTrue(accountant._anchorThread.isAlive());
+  }
+
+  @Test
+  void testPerQueryMemoryCheckNoActiveQuery() {
+    // Test when there's no active query on the thread
+    Map<String, Object> configs = new HashMap<>();
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK, 
true);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, 
true);
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES, 
1000L);
+
+    PinotConfiguration config = new PinotConfiguration(configs);
+    TestMemoryCheckAccountant accountant = new 
TestMemoryCheckAccountant(config, true, 2000L);
+
+    // Clear the active query
+    accountant._threadEntry._currentThreadTaskStatus.set(null);
+
+    // Should not interrupt when no active query
+    accountant.checkMemoryAndInterruptIfExceeded();
+
+    // Verify no error was set
+    assertEquals(accountant.getErrorStatus(), null);
+    assertTrue(accountant._anchorThread.isAlive());
+  }
+
+  /**
+   * Test helper class for memory checking functionality
+   */
+  private static class TestMemoryCheckAccountant
+      extends 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
+    final CPUMemThreadLevelAccountingObjects.ThreadEntry _threadEntry;
+    final Thread _anchorThread;
+
+    TestMemoryCheckAccountant(PinotConfiguration config, boolean 
isMemorySamplingEnabled, long memoryUsage) {
+      super(config, false, isMemorySamplingEnabled, true, new HashSet<>(), 
"testInstance", InstanceType.SERVER);

Review Comment:
   [nitpick] The hardcoded boolean values (false, true) in the constructor call 
make the test setup unclear. Consider using named constants or variables to 
clarify what these parameters represent.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1529,6 +1529,14 @@ public static class Accounting {
         "accounting.min.memory.footprint.to.kill.ratio";
     public static final double DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO = 0.025;
 
+    // Per-query memory threshold configurations
+    public static final String CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK = 
"accounting.per.query.memory.check.enabled";
+    public static final boolean DEFAULT_ENABLE_PER_QUERY_MEMORY_CHECK = false;
+
+    public static final String CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES = 
"accounting.per.query.memory.limit.bytes";
+    public static final long DEFAULT_PER_QUERY_MEMORY_LIMIT_BYTES = 
Runtime.getRuntime().maxMemory() / 3;
+    // 1/3 of heap size
+

Review Comment:
   The default memory limit calculation using Runtime.getRuntime().maxMemory() 
/ 3 is computed at class loading time, which may not reflect runtime heap 
changes or could cause issues if maxMemory() returns -1. Consider moving this 
calculation to a method or documenting the static initialization behavior.
   ```suggestion
   
       public static long getDefaultPerQueryMemoryLimitBytes() {
           long maxMemory = Runtime.getRuntime().maxMemory();
           return (maxMemory > 0) ? maxMemory / 3 : 0; // Safeguard for 
maxMemory() returning -1
       }
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -291,6 +310,39 @@ public boolean throttleQuerySubmission() {
       return getWatcherTask().getHeapUsageBytes() > 
getWatcherTask().getQueryMonitorConfig().getAlarmingLevel();
     }
 
+    public void checkMemoryAndInterruptIfExceeded() {
+      if (!_isPerQueryMemoryCheckEnabled || !_isThreadMemorySamplingEnabled) {
+        return;
+      }
+
+      CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
_threadLocalEntry.get();
+      CPUMemThreadLevelAccountingObjects.TaskEntry currentTaskStatus = 
threadEntry.getCurrentThreadTaskStatus();
+
+      if (currentTaskStatus == null) {
+        return; // No active query on this thread
+      }
+
+      long currentMemoryUsage = 
threadEntry._currentThreadMemoryAllocationSampleBytes;
+      if (currentMemoryUsage > _perQueryMemoryLimitBytes) {
+        String queryId = currentTaskStatus.getQueryId();
+        String errorMessage = String.format(
+            "Query %s exceeded per-query memory limit of %d bytes (current 
usage: %d bytes) on %s: %s. "
+                + "Query terminated proactively to prevent OOM.",
+            queryId, _perQueryMemoryLimitBytes, currentMemoryUsage, 
_instanceType, _instanceId);
+
+        // Set error status to terminate the query
+        threadEntry._errorStatus.set(new RuntimeException(errorMessage));

Review Comment:
   Setting a generic RuntimeException for memory limit exceeded scenarios makes 
it difficult for callers to handle this specific error type. Consider creating 
a dedicated exception class like 'QueryMemoryLimitExceededException' for better 
error handling and monitoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to