praveenc7 commented on code in PR #16378:
URL: https://github.com/apache/pinot/pull/16378#discussion_r2218144756
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java:
##########
@@ -44,6 +44,10 @@ public final T nextBlock() {
if (Tracing.ThreadAccountantOps.isInterrupted()) {
throw new EarlyTerminationException("Interrupted while processing next
block");
}
+
+ // Check per-thread memory usage and terminate query proactively if
threshold exceeded
Review Comment:
Could it be overly chatty for light operators that iterate in a tight loop
(e.g., FilterOperator on a single segment). Would the following addition make
sense?
1. Short-circuiting if the thread’s memory sample hasn’t changed
since last check.
2. Sampling every N blocks (configurable) instead of every block.
Even though each step is cheap, doing it 1000 times for a query that
finishes in ~10 ms adds measurable overhead (extra branches, TLS lookups,
potential volatile reads).
For heavier operators (joins, aggregations) the added cost is negligible,
but for “light” ones the ratio might be higher.
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -292,6 +328,157 @@ public boolean throttleQuerySubmission() {
return getWatcherTask().getHeapUsageBytes() >
getWatcherTask().getQueryMonitorConfig().getAlarmingLevel();
}
+ /**
+ * Thread-safe method to check memory usage and interrupt queries if
needed.
+ * Uses synchronized blocks and atomic operations to prevent race
conditions
+ * when multiple threads attempt to cancel the same query.
+ */
+ public void checkMemoryAndInterruptIfExceeded() {
+ WatcherTask watcherTask = getWatcherTask();
+ QueryMonitorConfig config = watcherTask.getQueryMonitorConfig();
+ LOGGER.debug("Operator Checking memory usage: {}, alarming level {},
critical level {}, panic level {}",
+ watcherTask.getHeapUsageBytes(), config.getAlarmingLevel(),
config.getCriticalLevel(),
+ config.getPanicLevel());
+
+ while (watcherTask.getHeapUsageBytes() > config.getAlarmingLevel()) {
+ long currentHeapUsage = watcherTask.getHeapUsageBytes();
+ long alarmingLevel = config.getAlarmingLevel();
+ long criticalLevel = config.getCriticalLevel();
+ long panicLevel = config.getPanicLevel();
+
+ // Get current thread's execution context
+ ThreadExecutionContext context = getThreadExecutionContext();
+ if (context == null || context.getQueryId() == null) {
+ // No query context, just break out
+ break;
+ }
+
+ String currentQueryId = context.getQueryId();
+
+ // Thread-safe check: Skip if query already cancelled
+ if (_cancelSentQueries.contains(currentQueryId)) {
+ // Query is already being canceled, but ensure the anchor thread is
interrupted.
+ if (context.getAnchorThread() != null &&
!context.getAnchorThread().isInterrupted()) {
+ context.getAnchorThread().interrupt();
+ }
+ break;
+ }
+
+ // Kill if heap is at panic level (scenario 3)
+ if (currentHeapUsage >= panicLevel) {
+ // Synchronized block to prevent race conditions in query
cancellation
+ synchronized (this) {
+ // Double-check if query is still not cancelled
+ if (!_cancelSentQueries.contains(currentQueryId)) {
+ CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
_threadLocalEntry.get();
+ threadEntry._errorStatus.set(new RuntimeException(
+ String.format("Query %s killed due to panic level heap
usage: %d bytes (panic threshold: %d bytes)",
+ currentQueryId, currentHeapUsage, panicLevel)));
+
+ if (context.getAnchorThread() != null) {
+ cancelQuery(currentQueryId, context.getAnchorThread());
+ }
+ }
+ }
+ break;
+ }
+
+ // Get current query resource usage (thread-safe snapshot)
+ final Map<String, ? extends QueryResourceTracker> queryResources =
getQueryResources();
+ final QueryResourceTracker currentQueryTracker =
queryResources.get(currentQueryId);
+
+ // Kill if heap is at critical level (scenarios 1 and 2)
+ if (currentHeapUsage >= criticalLevel && currentHeapUsage <
panicLevel) {
+ boolean shouldKill = false;
+ String killReason = "";
+
+ // Scenario 1: heap is at critical level + thread exceed thread heap
limit
+ if (_isPerQueryMemoryCheckEnabled && currentQueryTracker != null
+ && currentQueryTracker.getAllocatedBytes() >
_perQueryMemoryLimitBytes) {
+ shouldKill = true;
+ killReason = String.format(
+ "Query %s killed due to critical heap level and exceeding
per-query limit: %d bytes (limit: %d bytes)",
+ currentQueryId, currentQueryTracker.getAllocatedBytes(),
_perQueryMemoryLimitBytes);
+ } else {
+ // Scenario 2: heap is at critical level + if all threads not
exceed thread limit, I still need to kill
+ // the query with most heap usage to free up memory for other
queries
+ QueryResourceTracker maxUsageQuery =
queryResources.values().stream()
+ .filter(tracker ->
!_cancelSentQueries.contains(tracker.getQueryId()))
+
.max(Comparator.comparing(QueryResourceTracker::getAllocatedBytes))
+ .orElse(null);
+
+ if (maxUsageQuery != null &&
maxUsageQuery.getQueryId().equals(currentQueryId)) {
+ shouldKill = true;
+ killReason =
+ String.format("Query %s killed due to critical heap level as
highest memory consumer: %d bytes",
+ currentQueryId, currentQueryTracker != null ?
currentQueryTracker.getAllocatedBytes() : 0);
+ }
+ }
+
+ if (shouldKill) {
+ LOGGER.debug("Trying to kill query: {}, reason: {}",
currentQueryId, killReason);
+ // Synchronized block to prevent race conditions in query
cancellation
+ synchronized (this) {
+ // Double-check if query is still not cancelled
+ if (!_cancelSentQueries.contains(currentQueryId)) {
+ CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
_threadLocalEntry.get();
+ threadEntry._errorStatus.set(new RuntimeException(killReason));
+
+ if (context.getAnchorThread() != null) {
+ cancelQuery(currentQueryId, context.getAnchorThread());
+ }
+ }
+ }
+ break;
+ }
+ }
+
+ // Handle alarm level scenario
+ if (currentHeapUsage >= alarmingLevel && currentHeapUsage <
criticalLevel) {
+ // Determine if current query should proceed based on global query
ID grouping and leaf stage priority
+ boolean shouldProceed = false;
+
+ if (!queryResources.isEmpty()) {
+ // Get the start time of the current query from QueryThreadContext
+ long currentQueryStartTime = QueryThreadContext.getStartTimeMs();
+
+ if (currentQueryStartTime > 0) {
+ // Group queries by global query ID and prioritize leaf stages
(later start times) within each group
+ String currentGlobalQueryId =
extractGlobalQueryId(currentQueryId);
+
+ // Find the query that should proceed based on the new logic
+ String selectedQueryId =
findQueryToProcessAtAlarmLevel(queryResources, currentGlobalQueryId,
+ currentQueryId, currentQueryStartTime);
+
+ shouldProceed = selectedQueryId != null &&
selectedQueryId.equals(currentQueryId);
+
+ // Log waiting status for debugging
+ if (!shouldProceed) {
+ LOGGER.debug("Query {} waiting due to alarm level heap usage.
Selected query to proceed: {}",
+ currentQueryId, selectedQueryId);
+ }
+ }
+ }
+
+ if (shouldProceed) {
+ // Proceed when the query is selected to proceed
+ LOGGER.debug(
+ "Query {} proceeding as it is selected based on global query
ID grouping and leaf stage priority",
+ currentQueryId);
+ break;
+ } else {
+ // Wait and let the outer loop re-evaluate - pause 100ms before
next iteration
+ try {
+ Thread.sleep(100);
Review Comment:
Do we want to consider exponential ?
Might reduce latency penalty if memory pressure clears quickly
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -195,6 +214,23 @@ public
PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, String i
CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE);
LOGGER.info("_isThreadSamplingEnabledForMSE: {}",
_isThreadSamplingEnabledForMSE);
+ _isPerQueryMemoryCheckEnabled = config.getProperty(
+
CommonConstants.Accounting.CONFIG_OF_PER_THREAD_QUERY_MEMORY_CHECK_ENABLED,
+
CommonConstants.Accounting.DEFAULT_PER_THREAD_QUERY_MEMORY_CHECK_ENABLED);
+ long configuredLimit = config.getProperty(
+
CommonConstants.Accounting.CONFIG_OF_PER_THREAD_QUERY_MEMORY_LIMIT_BYTES,
+
CommonConstants.Accounting.DEFAULT_PER_THREAD_QUERY_MEMORY_LIMIT_BYTES);
+ // If using default value, dynamically calculate based on actual heap
size for better defaults
+ if (configuredLimit ==
CommonConstants.Accounting.DEFAULT_PER_THREAD_QUERY_MEMORY_LIMIT_BYTES) {
+ long maxHeapMemory = Runtime.getRuntime().maxMemory();
Review Comment:
Inside cgroup, sometime memory.limit_in_bytes < JVM -Xmx.
When computing default, is there a way to compare maxMemory() with
/sys/fs/cgroup/memory.max if present.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]