Copilot commented on code in PR #16378:
URL: https://github.com/apache/pinot/pull/16378#discussion_r2214691547
##########
pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java:
##########
@@ -265,4 +268,180 @@ void testQueryAggregationAddNewQueryTask() {
threadLatch.countDown();
newQueryThreadLatch.countDown();
}
+
+ @Test
+ void testPerQueryMemoryCheckDisabled() {
+ // Test when per-query memory check is disabled
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK,
false);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
true);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES,
1000L);
+
+ PinotConfiguration config = new PinotConfiguration(configs);
+ TestMemoryCheckAccountant accountant = new
TestMemoryCheckAccountant(config, false, 2000L);
+
+ // Should not interrupt when feature is disabled
+ accountant.checkMemoryAndInterruptIfExceeded();
+
+ // Verify no error was set
+ assertEquals(accountant.getErrorStatus(), null);
+ assertTrue(accountant._anchorThread.isAlive());
+ }
+
+ @Test
+ void testPerQueryMemoryCheckUnderLimit() {
+ // Test when memory usage is under the limit
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK,
true);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
true);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES,
3000L);
+
+ PinotConfiguration config = new PinotConfiguration(configs);
+ TestMemoryCheckAccountant accountant = new
TestMemoryCheckAccountant(config, true, 2000L);
+
+ // Should not interrupt when under limit
+ accountant.checkMemoryAndInterruptIfExceeded();
+
+ // Verify no error was set
+ assertEquals(accountant.getErrorStatus(), null);
+ assertTrue(accountant._anchorThread.isAlive());
+ }
+
+ @Test
+ void testPerQueryMemoryCheckOverLimit() {
+ // Test when memory usage exceeds the limit
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK,
true);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
true);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES,
1000L);
+
+ PinotConfiguration config = new PinotConfiguration(configs);
+ TestMemoryCheckAccountant accountant = new
TestMemoryCheckAccountant(config, true, 2000L);
+
+ // Should interrupt when over limit
+ accountant.checkMemoryAndInterruptIfExceeded();
+
+ // Verify error was set with appropriate message
+ Exception error = accountant.getTestErrorStatus();
+ assertNotNull(error);
+ assertTrue(error.getMessage().contains("exceeded per-query memory limit"));
+ assertTrue(error.getMessage().contains("testQuery"));
+ assertTrue(error.getMessage().contains("2000 bytes"));
+ assertTrue(error.getMessage().contains("1000 bytes"));
+
+ // Verify anchor thread interruption was attempted (the main functionality
is the error message)
+ // Note: Thread.isInterrupted() might not immediately return true due to
timing issues
+ }
+
+ @Test
+ void testPerQueryMemoryCheckMemorySamplingDisabled() {
+ // Test when memory sampling is disabled
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK,
true);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
false);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES,
1000L);
+
+ PinotConfiguration config = new PinotConfiguration(configs);
+ TestMemoryCheckAccountant accountant = new
TestMemoryCheckAccountant(config, false, 2000L);
+
+ // Should not interrupt when memory sampling is disabled
+ accountant.checkMemoryAndInterruptIfExceeded();
+
+ // Verify no error was set
+ assertEquals(accountant.getErrorStatus(), null);
+ assertTrue(accountant._anchorThread.isAlive());
+ }
+
+ @Test
+ void testPerQueryMemoryCheckNoActiveQuery() {
+ // Test when there's no active query on the thread
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK,
true);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
true);
+
configs.put(CommonConstants.Accounting.CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES,
1000L);
+
+ PinotConfiguration config = new PinotConfiguration(configs);
+ TestMemoryCheckAccountant accountant = new
TestMemoryCheckAccountant(config, true, 2000L);
+
+ // Clear the active query
+ accountant._threadEntry._currentThreadTaskStatus.set(null);
+
+ // Should not interrupt when no active query
+ accountant.checkMemoryAndInterruptIfExceeded();
+
+ // Verify no error was set
+ assertEquals(accountant.getErrorStatus(), null);
+ assertTrue(accountant._anchorThread.isAlive());
+ }
+
+ /**
+ * Test helper class for memory checking functionality
+ */
+ private static class TestMemoryCheckAccountant
+ extends
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
+ final CPUMemThreadLevelAccountingObjects.ThreadEntry _threadEntry;
+ final Thread _anchorThread;
+
+ TestMemoryCheckAccountant(PinotConfiguration config, boolean
isMemorySamplingEnabled, long memoryUsage) {
+ super(config, false, isMemorySamplingEnabled, true, new HashSet<>(),
"testInstance", InstanceType.SERVER);
Review Comment:
[nitpick] The hardcoded boolean values (false, true) in the constructor call
make the test setup unclear. Consider using named constants or variables to
clarify what these parameters represent.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1529,6 +1529,14 @@ public static class Accounting {
"accounting.min.memory.footprint.to.kill.ratio";
public static final double DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO = 0.025;
+ // Per-query memory threshold configurations
+ public static final String CONFIG_OF_ENABLE_PER_QUERY_MEMORY_CHECK =
"accounting.per.query.memory.check.enabled";
+ public static final boolean DEFAULT_ENABLE_PER_QUERY_MEMORY_CHECK = false;
+
+ public static final String CONFIG_OF_PER_QUERY_MEMORY_LIMIT_BYTES =
"accounting.per.query.memory.limit.bytes";
+ public static final long DEFAULT_PER_QUERY_MEMORY_LIMIT_BYTES =
Runtime.getRuntime().maxMemory() / 3;
+ // 1/3 of heap size
+
Review Comment:
The default memory limit calculation using Runtime.getRuntime().maxMemory()
/ 3 is computed at class loading time, which may not reflect runtime heap
changes or could cause issues if maxMemory() returns -1. Consider moving this
calculation to a method or documenting the static initialization behavior.
```suggestion
public static long getDefaultPerQueryMemoryLimitBytes() {
long maxMemory = Runtime.getRuntime().maxMemory();
return (maxMemory > 0) ? maxMemory / 3 : 0; // Safeguard for
maxMemory() returning -1
}
```
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -291,6 +310,39 @@ public boolean throttleQuerySubmission() {
return getWatcherTask().getHeapUsageBytes() >
getWatcherTask().getQueryMonitorConfig().getAlarmingLevel();
}
+ public void checkMemoryAndInterruptIfExceeded() {
+ if (!_isPerQueryMemoryCheckEnabled || !_isThreadMemorySamplingEnabled) {
+ return;
+ }
+
+ CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
_threadLocalEntry.get();
+ CPUMemThreadLevelAccountingObjects.TaskEntry currentTaskStatus =
threadEntry.getCurrentThreadTaskStatus();
+
+ if (currentTaskStatus == null) {
+ return; // No active query on this thread
+ }
+
+ long currentMemoryUsage =
threadEntry._currentThreadMemoryAllocationSampleBytes;
+ if (currentMemoryUsage > _perQueryMemoryLimitBytes) {
+ String queryId = currentTaskStatus.getQueryId();
+ String errorMessage = String.format(
+ "Query %s exceeded per-query memory limit of %d bytes (current
usage: %d bytes) on %s: %s. "
+ + "Query terminated proactively to prevent OOM.",
+ queryId, _perQueryMemoryLimitBytes, currentMemoryUsage,
_instanceType, _instanceId);
+
+ // Set error status to terminate the query
+ threadEntry._errorStatus.set(new RuntimeException(errorMessage));
Review Comment:
Setting a generic RuntimeException for memory limit exceeded scenarios makes
it difficult for callers to handle this specific error type. Consider creating
a dedicated exception class like 'QueryMemoryLimitExceededException' for better
error handling and monitoring.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]