This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 12e6c885d82 Remove all calls to System.gc() in
PerQueryCPUMemAccountantFactory (#16374)
12e6c885d82 is described below
commit 12e6c885d820511c28acd0cce0ca2ccf0ef39738
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Fri Jul 18 12:27:50 2025 +0530
Remove all calls to System.gc() in PerQueryCPUMemAccountantFactory (#16374)
---
.../PerQueryCPUMemAccountantFactory.java | 43 +++----------
.../pinot/core/accounting/QueryMonitorConfig.java | 72 ----------------------
.../core/accounting/QueryMonitorConfigTest.java | 55 -----------------
3 files changed, 9 insertions(+), 161 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 0531a586962..71944bd6108 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -160,6 +160,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
_instanceType = instanceType;
_cancelSentQueries = new HashSet<>();
_watcherTask = createWatcherTask();
+ _queryCancelCallbacks = CacheBuilder.newBuilder().build();
}
public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config,
String instanceId,
@@ -564,10 +565,11 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
_cancelSentQueries.add(queryId);
}
- protected void logTerminatedQuery(QueryResourceTracker
queryResourceTracker, long totalHeapMemoryUsage) {
- LOGGER.warn("Query {} terminated. Memory Usage: {}. Cpu Usage: {}. Total
Heap Usage: {}",
+ protected void logTerminatedQuery(QueryResourceTracker
queryResourceTracker, long totalHeapMemoryUsage,
+ boolean hasCallback) {
+ LOGGER.warn("Query {} terminated. Memory Usage: {}. Cpu Usage: {}. Total
Heap Usage: {}. Used Callback: {}",
queryResourceTracker.getQueryId(),
queryResourceTracker.getAllocatedBytes(),
- queryResourceTracker.getCpuTimeNs(), totalHeapMemoryUsage);
+ queryResourceTracker.getCpuTimeNs(), totalHeapMemoryUsage,
hasCallback);
}
@Override
@@ -661,7 +663,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
protected long _usedBytes;
protected int _sleepTime;
- protected int _numQueriesKilledConsecutively = 0;
protected Map<String, AggregatedStats> _aggregatedUsagePerActiveQuery;
protected TriggeringLevel _triggeringLevel;
@@ -744,10 +745,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
LOGGER.info("_instanceType is {}", _instanceType);
LOGGER.info("_alarmingLevel of on heap memory is {}",
queryMonitorConfig.getAlarmingLevel());
LOGGER.info("_criticalLevel of on heap memory is {}",
queryMonitorConfig.getCriticalLevel());
- LOGGER.info("_criticalLevelAfterGC of on heap memory is {}",
queryMonitorConfig.getCriticalLevelAfterGC());
LOGGER.info("_panicLevel of on heap memory is {}",
queryMonitorConfig.getPanicLevel());
- LOGGER.info("_gcBackoffCount is {}",
queryMonitorConfig.getGcBackoffCount());
- LOGGER.info("_gcWaitTime is {}", queryMonitorConfig.getGcWaitTime());
LOGGER.info("_normalSleepTime is {}",
queryMonitorConfig.getNormalSleepTime());
LOGGER.info("_alarmingSleepTime is {}",
queryMonitorConfig.getAlarmingSleepTime());
LOGGER.info("_oomKillQueryEnabled: {}",
queryMonitorConfig.isOomKillQueryEnabled());
@@ -873,10 +871,8 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
case HeapMemoryAlarmingVerbose:
LOGGER.warn("Heap used bytes {} exceeds alarming level",
_usedBytes);
LOGGER.warn("Query usage aggregation results {}",
_aggregatedUsagePerActiveQuery.toString());
- _numQueriesKilledConsecutively = 0;
break;
default:
- _numQueriesKilledConsecutively = 0;
break;
}
}
@@ -904,13 +900,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
if (config.isQueryKilledMetricEnabled()) {
_metrics.addMeteredGlobalValue(_queryKilledMeter, killedCount);
}
- try {
- Thread.sleep(config.getNormalSleepTime());
- } catch (InterruptedException ignored) {
- }
- // In this extreme case we directly trigger system.gc
- System.gc();
- _numQueriesKilledConsecutively = 0;
}
}
@@ -925,21 +914,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
return;
}
QueryMonitorConfig config = _queryMonitorConfig.get();
- if (_aggregatedUsagePerActiveQuery != null &&
!_aggregatedUsagePerActiveQuery.isEmpty()
- && _numQueriesKilledConsecutively >= config.getGcBackoffCount()) {
- _numQueriesKilledConsecutively = 0;
- System.gc();
- try {
- Thread.sleep(config.getGcWaitTime());
- } catch (InterruptedException ignored) {
- }
- _usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
- if (_usedBytes < config.getCriticalLevelAfterGC()) {
- return;
- }
- LOGGER.error("After GC, heap used bytes {} still exceeds
_criticalLevelAfterGC level {}", _usedBytes,
- config.getCriticalLevelAfterGC());
- }
// Critical heap memory usage while no queries running
if (_aggregatedUsagePerActiveQuery != null &&
!_aggregatedUsagePerActiveQuery.isEmpty()) {
AggregatedStats maxUsageTuple;
@@ -953,8 +927,9 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
maxUsageTuple._exceptionAtomicReference.set(new RuntimeException(
String.format(" Query %s got killed because using %d bytes
of memory on %s: %s, exceeding the quota",
maxUsageTuple._queryId,
maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId)));
+ boolean hasCallBack =
_queryCancelCallbacks.getIfPresent(maxUsageTuple.getQueryId()) != null;
terminateQuery(maxUsageTuple);
- logTerminatedQuery(maxUsageTuple, _usedBytes);
+ logTerminatedQuery(maxUsageTuple, _usedBytes, hasCallBack);
} else if (!config.isOomKillQueryEnabled()) {
LOGGER.warn("Query {} got picked because using {} bytes of
memory, actual kill committed false "
+ "because oomKillQueryEnabled is false",
maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
@@ -981,8 +956,9 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
String.format("Query %s got killed on %s: %s because using %d "
+ "CPU time exceeding limit of %d ns CPU time",
value._queryId, _instanceType, _instanceId,
value.getCpuTimeNs(),
config.getCpuTimeBasedKillingThresholdNS())));
+ boolean hasCallBack =
_queryCancelCallbacks.getIfPresent(value.getQueryId()) != null;
terminateQuery(value);
- logTerminatedQuery(value, _usedBytes);
+ logTerminatedQuery(value, _usedBytes, hasCallBack);
}
}
logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
@@ -993,7 +969,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
if (_queryMonitorConfig.get().isQueryKilledMetricEnabled()) {
_metrics.addMeteredGlobalValue(_queryKilledMeter, 1);
}
- _numQueriesKilledConsecutively += 1;
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
index ba4b05aa81e..327b34e4bd2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
@@ -36,28 +36,12 @@ public class QueryMonitorConfig {
// kill the most expensive query if heap usage exceeds this
private final long _criticalLevel;
- // if after gc the heap usage is still above this, kill the most expensive
query
- // use this to prevent heap size oscillation and repeatedly triggering gc
- private final long _criticalLevelAfterGC;
-
- // trigger gc if consecutively kill more than some number of queries
- // set this to 0 to always trigger gc before killing a query to give gc a
second chance
- // as would minimize the chance of false positive killing in some usecases
- // should consider use -XX:+ExplicitGCInvokesConcurrent to avoid STW for
some gc algorithms
- private final int _gcBackoffCount;
-
// start to sample more frequently if heap usage exceeds this
private final long _alarmingLevel;
// normal sleep time
private final int _normalSleepTime;
- // wait for gc to complete, according to system.gc() javadoc, when control
returns from the method call,
- // the Java Virtual Machine has made a best effort to reclaim space from all
discarded objects.
- // Therefore, we default this to 0.
- // Tested with Shenandoah GC and G1GC, with -XX:+ExplicitGCInvokesConcurrent
- private final int _gcWaitTime;
-
// alarming sleep time denominator, should be > 1 to sample more frequent at
alarming level
private final int _alarmingSleepTimeDenominator;
@@ -94,13 +78,6 @@ public class QueryMonitorConfig {
(long) (maxHeapSize *
config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO,
CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO));
- _criticalLevelAfterGC = _criticalLevel - (long) (maxHeapSize *
config.getProperty(
-
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC,
-
CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC));
-
- _gcBackoffCount =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT,
- CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT);
-
_alarmingLevel =
(long) (maxHeapSize *
config.getProperty(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO,
CommonConstants.Accounting.DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO));
@@ -108,9 +85,6 @@ public class QueryMonitorConfig {
_normalSleepTime =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS,
CommonConstants.Accounting.DEFAULT_SLEEP_TIME_MS);
- _gcWaitTime =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS,
- CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS);
-
_alarmingSleepTimeDenominator =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR,
CommonConstants.Accounting.DEFAULT_SLEEP_TIME_DENOMINATOR);
@@ -174,30 +148,6 @@ public class QueryMonitorConfig {
_criticalLevel = oldConfig._criticalLevel;
}
- if
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC))
{
- if (clusterConfigs == null || !clusterConfigs.containsKey(
-
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC))
{
- _criticalLevelAfterGC = _criticalLevel - (long) (_maxHeapSize
- *
CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC);
- } else {
- _criticalLevelAfterGC = _criticalLevel - (long) (_maxHeapSize *
Double.parseDouble(
-
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)));
- }
- } else {
- _criticalLevelAfterGC = oldConfig._criticalLevelAfterGC;
- }
-
- if
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT))
{
- if (clusterConfigs == null || !clusterConfigs.containsKey(
- CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT)) {
- _gcBackoffCount = CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT;
- } else {
- _gcBackoffCount =
Integer.parseInt(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT));
- }
- } else {
- _gcBackoffCount = oldConfig._gcBackoffCount;
- }
-
if
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO))
{
if (clusterConfigs == null || !clusterConfigs.containsKey(
CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO)) {
@@ -220,16 +170,6 @@ public class QueryMonitorConfig {
_normalSleepTime = oldConfig._normalSleepTime;
}
- if
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS))
{
- if (clusterConfigs == null ||
!clusterConfigs.containsKey(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS))
{
- _gcWaitTime =
CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS;
- } else {
- _gcWaitTime =
Integer.parseInt(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS));
- }
- } else {
- _gcWaitTime = oldConfig._gcWaitTime;
- }
-
if
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR))
{
if (clusterConfigs == null || !clusterConfigs.containsKey(
CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR)) {
@@ -323,14 +263,6 @@ public class QueryMonitorConfig {
return _criticalLevel;
}
- public long getCriticalLevelAfterGC() {
- return _criticalLevelAfterGC;
- }
-
- public int getGcBackoffCount() {
- return _gcBackoffCount;
- }
-
public long getAlarmingLevel() {
return _alarmingLevel;
}
@@ -339,10 +271,6 @@ public class QueryMonitorConfig {
return _normalSleepTime;
}
- public int getGcWaitTime() {
- return _gcWaitTime;
- }
-
public int getAlarmingSleepTime() {
return _alarmingSleepTime;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigTest.java
index f9b66a72205..d5afcd63d66 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/QueryMonitorConfigTest.java
@@ -36,11 +36,8 @@ public class QueryMonitorConfigTest {
private static final double EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL = 0.05;
private static final double EXPECTED_PANIC_LEVEL = 0.9f;
private static final double EXPECTED_CRITICAL_LEVEL = 0.95f;
- private static final double EXPECTED_CRITICAL_LEVEL_AFTER_GC = 0.05f;
- private static final int EXPECTED_GC_BACKOFF_COUNT = 3;
private static final double EXPECTED_ALARMING_LEVEL = 0.8f;
private static final int EXPECTED_NORMAL_SLEEP_TIME = 50;
- private static final int EXPECTED_GC_WAIT_TIME = 1000;
private static final int EXPECTED_ALARMING_SLEEP_TIME_DENOMINATOR = 2;
private static final boolean EXPECTED_OOM_KILL_QUERY_ENABLED = true;
private static final boolean EXPECTED_PUBLISH_HEAP_USAGE_METRIC = true;
@@ -70,9 +67,6 @@ public class QueryMonitorConfigTest {
CLUSTER_CONFIGS.put(
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO),
Double.toString(EXPECTED_CRITICAL_LEVEL));
- CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(
-
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC),
- Double.toString(EXPECTED_CRITICAL_LEVEL_AFTER_GC));
CLUSTER_CONFIGS.put(
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO),
Double.toString(EXPECTED_ALARMING_LEVEL));
@@ -83,10 +77,6 @@ public class QueryMonitorConfigTest {
CLUSTER_CONFIGS.put(
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO),
Double.toString(EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL));
-
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT),
- Integer.toString(EXPECTED_GC_BACKOFF_COUNT));
-
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS),
- Integer.toString(EXPECTED_GC_WAIT_TIME));
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED),
Boolean.toString(EXPECTED_IS_QUERY_KILLED_METRIC_ENABLED));
}
@@ -177,23 +167,6 @@ public class QueryMonitorConfigTest {
EXPECTED_CRITICAL_LEVEL *
accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize());
}
- @Test
- void testCriticalLevelHeapUsageRatioDeltaAfterGCConfigChange() {
- PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant
accountant =
- new
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new
PinotConfiguration(), "test",
- InstanceType.SERVER);
-
-
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevelAfterGC(),
- accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevel()
- -
CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC
- *
accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize());
- accountant.getWatcherTask().onChange(Set.of(getFullyQualifiedConfigName(
-
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)),
CLUSTER_CONFIGS);
-
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevelAfterGC(),
- accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevel()
- - EXPECTED_CRITICAL_LEVEL_AFTER_GC *
accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize());
- }
-
@Test
void testAlarmingLevelHeapUsageRatioConfigChange() {
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant
accountant =
@@ -258,34 +231,6 @@ public class QueryMonitorConfigTest {
.getMaxHeapSize()));
}
- @Test
- void testGCBackoffCountConfigChange() {
- PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant
accountant =
- new
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new
PinotConfiguration(), "test",
- InstanceType.SERVER);
-
-
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcBackoffCount(),
- CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT);
- accountant.getWatcherTask()
-
.onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT)),
- CLUSTER_CONFIGS);
-
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcBackoffCount(),
EXPECTED_GC_BACKOFF_COUNT);
- }
-
- @Test
- void testGCWaitTimeConfigChange() {
- PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant
accountant =
- new
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new
PinotConfiguration(), "test",
- InstanceType.SERVER);
-
-
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcWaitTime(),
- CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS);
- accountant.getWatcherTask()
-
.onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS)),
- CLUSTER_CONFIGS);
-
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcWaitTime(),
EXPECTED_GC_WAIT_TIME);
- }
-
@Test
void testQueryKilledMetricEnabledConfigChange() {
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant
accountant =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]