This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c8eb53a4d3 Make OOM Protection work with GRPC Queries (#16004)
c8eb53a4d3 is described below
commit c8eb53a4d3823700956badc29eac4c6ed81e0b7d
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Wed Jun 11 16:53:45 2025 +0530
Make OOM Protection work with GRPC Queries (#16004)
---
.../CPUMemThreadLevelAccountingObjects.java | 5 +-
.../PerQueryCPUMemAccountantFactory.java | 41 ++++++++++----
.../tests/OfflineGRPCServerIntegrationTest.java | 2 +-
...lineGRPCServerOOMAccountingIntegrationTest.java | 46 +++++++++++++++
.../pinot/query/runtime/operator/OpChain.java | 2 +
.../spi/accounting/ThreadExecutionContext.java | 2 +-
.../spi/accounting/ThreadResourceTracker.java | 2 +
.../accounting/ThreadResourceUsageAccountant.java | 23 +++++++-
.../java/org/apache/pinot/spi/trace/Tracing.java | 66 +++++++---------------
9 files changed, 125 insertions(+), 64 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
index 0c2e0daee4..eb7ac33e2b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
@@ -93,9 +93,10 @@ public class CPUMemThreadLevelAccountingObjects {
return _currentThreadMemoryAllocationSampleBytes;
}
+ @Nullable
public String getQueryId() {
TaskEntry taskEntry = _currentThreadTaskStatus.get();
- return taskEntry == null ? "" : taskEntry.getQueryId();
+ return taskEntry == null ? null : taskEntry.getQueryId();
}
public int getTaskId() {
@@ -109,7 +110,7 @@ public class CPUMemThreadLevelAccountingObjects {
return taskEntry == null ? ThreadExecutionContext.TaskType.UNKNOWN :
taskEntry.getTaskType();
}
- public void setThreadTaskStatus(@Nullable String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
+ public void setThreadTaskStatus(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
@Nonnull Thread anchorThread) {
_currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType,
anchorThread));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 4f1fa0087f..2430ce7895 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -49,7 +49,6 @@ import
org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
-import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +66,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
return new PerQueryCPUMemResourceUsageAccountant(config, instanceId,
instanceType);
}
- public static class PerQueryCPUMemResourceUsageAccountant extends
Tracing.DefaultThreadResourceUsageAccountant {
+ public static class PerQueryCPUMemResourceUsageAccountant implements
ThreadResourceUsageAccountant {
/**
* MemoryMXBean to get total heap used memory
@@ -254,6 +253,22 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
}
+ @Override
+ public boolean isAnchorThreadInterrupted() {
+ ThreadExecutionContext context =
_threadLocalEntry.get().getCurrentThreadTaskStatus();
+ if (context != null && context.getAnchorThread() != null) {
+ return context.getAnchorThread().isInterrupted();
+ }
+
+ return false;
+ }
+
+ @Override
+ @Deprecated
+ public void createExecutionContext(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
+ @Nullable ThreadExecutionContext parentContext) {
+ }
+
/**
* for testing only
*/
@@ -310,22 +325,26 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
@Override
- public void createExecutionContextInner(@Nullable String queryId, int
taskId,
- ThreadExecutionContext.TaskType taskType, @Nullable
ThreadExecutionContext parentContext) {
+ public void setupRunner(@Nullable String queryId, int taskId,
ThreadExecutionContext.TaskType taskType) {
_threadLocalEntry.get()._errorStatus.set(null);
- if (parentContext == null) {
- // is anchor thread
- assert queryId != null;
+ if (queryId != null) {
_threadLocalEntry.get()
.setThreadTaskStatus(queryId,
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, Thread.currentThread());
- } else {
- // not anchor thread
- _threadLocalEntry.get().setThreadTaskStatus(queryId, taskId,
parentContext.getTaskType(),
+ }
+ }
+
+ @Override
+ public void setupWorker(int taskId, ThreadExecutionContext.TaskType
taskType,
+ @Nullable ThreadExecutionContext parentContext) {
+ _threadLocalEntry.get()._errorStatus.set(null);
+ if (parentContext != null && parentContext.getQueryId() != null &&
parentContext.getAnchorThread() != null) {
+
_threadLocalEntry.get().setThreadTaskStatus(parentContext.getQueryId(), taskId,
parentContext.getTaskType(),
parentContext.getAnchorThread());
}
}
@Override
+ @Nullable
public ThreadExecutionContext getThreadExecutionContext() {
return _threadLocalEntry.get().getCurrentThreadTaskStatus();
}
@@ -345,8 +364,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
threadEntry.setToIdle();
// clear threadResourceUsageProvider
_threadResourceUsageProvider.remove();
- // clear _anchorThread
- super.clear();
}
@Override
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
index 5f08d73a36..461376b714 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
@@ -244,7 +244,7 @@ public class OfflineGRPCServerIntegrationTest extends
BaseClusterIntegrationTest
if
(responseType.equals(CommonConstants.Query.Response.ResponseType.DATA)) {
// verify the returned data table metadata only contains
"responseSerializationCpuTimeNs".
Map<String, String> metadata = dataTable.getMetadata();
- assertTrue(metadata.size() == 1 &&
metadata.containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
+
assertTrue(metadata.containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
assertNotNull(dataTable.getDataSchema());
numTotalDocs += dataTable.getNumberOfRows();
} else {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerOOMAccountingIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerOOMAccountingIntegrationTest.java
new file mode 100644
index 0000000000..2ffe028f9c
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerOOMAccountingIntegrationTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class OfflineGRPCServerOOMAccountingIntegrationTest extends
OfflineGRPCServerIntegrationTest {
+ protected void overrideServerConf(PinotConfiguration serverConf) {
+ serverConf.setProperty(
+ CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." +
CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+ "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
+ serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+ + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
true);
+ serverConf.setProperty(
+ CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+ + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING,
true);
+ serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+ + CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY,
true);
+
serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
true);
+
serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
true);
+ serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+ + CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED,
true);
+ }
+
+ protected void overrideBrokerConf(PinotConfiguration serverConf) {
+ overrideServerConf(serverConf);
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index cd2210a0f0..3e9651ac74 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.runtime.operator;
import java.util.function.Consumer;
+import javax.annotation.Nullable;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.slf4j.Logger;
@@ -63,6 +64,7 @@ public class OpChain implements AutoCloseable {
return _root;
}
+ @Nullable
public ThreadExecutionContext getParentContext() {
return _parentContext;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
index 7ea59ad378..320f187605 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
@@ -28,7 +28,7 @@ public interface ThreadExecutionContext {
* MSE: Multi Stage Engine
* UNKNOWN: Default
*/
- public enum TaskType {
+ enum TaskType {
SSE,
MSE,
UNKNOWN
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
index 418210b376..d084bf40d4 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
@@ -19,6 +19,7 @@
package org.apache.pinot.spi.accounting;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import javax.annotation.Nullable;
/**
@@ -42,6 +43,7 @@ public interface ThreadResourceTracker {
* QueryId of the task the thread is executing.
* @return a string containing the query id.
*/
+ @Nullable
String getQueryId();
/**
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index a3d1061dd6..6a33ac3ac0 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -37,17 +37,34 @@ public interface ThreadResourceUsageAccountant {
boolean isAnchorThreadInterrupted();
/**
- * Task tracking info
+ * This method has been deprecated and replaced by {@link
setupRunner(String, int, ThreadExecutionContext.TaskType)}
+ * and {@link setupWorker(int, ThreadExecutionContext.TaskType,
ThreadExecutionContext)}.
+ */
+ @Deprecated
+ void createExecutionContext(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
+ @Nullable ThreadExecutionContext parentContext);
+
+ /**
+ * Set up the thread execution context for an anchor a.k.a runner thread.
* @param queryId query id string
* @param taskId a unique task id
- * @param parentContext the parent execution context, null for root(runner)
thread
+ * @param taskType the type of the task - SSE or MSE
*/
- void createExecutionContext(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
+ void setupRunner(String queryId, int taskId, ThreadExecutionContext.TaskType
taskType);
+
+ /**
+ * Set up the thread execution context for a worker thread.
+ * @param taskId a unique task id
+ * @param taskType the type of the task - SSE or MSE
+ * @param parentContext the parent execution context
+ */
+ void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType,
@Nullable ThreadExecutionContext parentContext);
/**
* get the executon context of current thread
*/
+ @Nullable
ThreadExecutionContext getThreadExecutionContext();
/**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 964a64bad1..198d083c57 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.pinot.spi.accounting.QueryResourceTracker;
import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
@@ -172,22 +171,23 @@ public class Tracing {
*/
public static class DefaultThreadResourceUsageAccountant implements
ThreadResourceUsageAccountant {
- // worker thread's corresponding anchor thread, worker will also interrupt
if it finds anchor's flag is raised
- private final ThreadLocal<Thread> _anchorThread;
-
- public DefaultThreadResourceUsageAccountant() {
- _anchorThread = new ThreadLocal<>();
+ @Override
+ public boolean isAnchorThreadInterrupted() {
+ return false;
}
@Override
- public boolean isAnchorThreadInterrupted() {
- Thread thread = _anchorThread.get();
- return thread != null && thread.isInterrupted();
+ public void createExecutionContext(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
+ @Nullable ThreadExecutionContext parentContext) {
+ }
+
+ @Deprecated
+ public void createExecutionContextInner(@Nullable String queryId, int
taskId,
+ ThreadExecutionContext.TaskType taskType, @Nullable
ThreadExecutionContext parentContext) {
}
@Override
public void clear() {
- _anchorThread.remove();
}
@Override
@@ -207,34 +207,18 @@ public class Tracing {
}
@Override
- public final void createExecutionContext(@Nullable String queryId, int
taskId,
- ThreadExecutionContext.TaskType taskType, @Nullable
ThreadExecutionContext parentContext) {
- _anchorThread.set(parentContext == null ? Thread.currentThread() :
parentContext.getAnchorThread());
- createExecutionContextInner(queryId, taskId, taskType, parentContext);
+ public void setupRunner(@Nullable String queryId, int taskId,
ThreadExecutionContext.TaskType taskType) {
}
- public void createExecutionContextInner(@Nullable String queryId, int
taskId,
- ThreadExecutionContext.TaskType taskType, @Nullable
ThreadExecutionContext parentContext) {
+ @Override
+ public void setupWorker(int taskId, ThreadExecutionContext.TaskType
taskType,
+ @Nullable ThreadExecutionContext parentContext) {
}
@Override
+ @Nullable
public ThreadExecutionContext getThreadExecutionContext() {
- return new ThreadExecutionContext() {
- @Override
- public String getQueryId() {
- return null;
- }
-
- @Override
- public Thread getAnchorThread() {
- return _anchorThread.get();
- }
-
- @Override
- public TaskType getTaskType() {
- return TaskType.UNKNOWN;
- }
- };
+ return null;
}
@Override
@@ -267,14 +251,13 @@ public class Tracing {
private ThreadAccountantOps() {
}
- public static void setupRunner(@Nonnull String queryId) {
+ public static void setupRunner(String queryId) {
setupRunner(queryId, ThreadExecutionContext.TaskType.SSE);
}
- public static void setupRunner(@Nonnull String queryId,
ThreadExecutionContext.TaskType taskType) {
+ public static void setupRunner(String queryId,
ThreadExecutionContext.TaskType taskType) {
Tracing.getThreadAccountant().setThreadResourceUsageProvider(new
ThreadResourceUsageProvider());
- Tracing.getThreadAccountant()
- .createExecutionContext(queryId,
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, null);
+ Tracing.getThreadAccountant().setupRunner(queryId,
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType);
}
/**
@@ -292,16 +275,9 @@ public class Tracing {
* @param threadExecutionContext Context holds metadata about the query.
*/
public static void setupWorker(int taskId, ThreadExecutionContext.TaskType
taskType,
- ThreadExecutionContext threadExecutionContext) {
+ @Nullable ThreadExecutionContext threadExecutionContext) {
Tracing.getThreadAccountant().setThreadResourceUsageProvider(new
ThreadResourceUsageProvider());
- String queryId = null;
- if (threadExecutionContext != null) {
- queryId = threadExecutionContext.getQueryId();
- } else {
- LOGGER.warn("Request ID not available. ParentContext not set for query
worker thread.");
- }
- Tracing.getThreadAccountant()
- .createExecutionContext(queryId, taskId, taskType,
threadExecutionContext);
+ Tracing.getThreadAccountant().setupWorker(taskId, taskType,
threadExecutionContext);
}
public static void sample() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]