This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cf1e3dec758 Revert "Remove TaskType from ThreadExecutionContext
(#16633)" (#16640)
cf1e3dec758 is described below
commit cf1e3dec758c57772148e2b42841931bc7024941
Author: Jhow <[email protected]>
AuthorDate: Tue Aug 19 15:58:46 2025 -0700
Revert "Remove TaskType from ThreadExecutionContext (#16633)" (#16640)
This reverts commit 72bf3bf26fa3d269daf5fd15a6dfbc43db6c8b66.
---
.../BaseSingleStageBrokerRequestHandler.java | 4 +++-
.../MultiStageBrokerRequestHandler.java | 4 +++-
.../CPUMemThreadLevelAccountingObjects.java | 25 +++++++++++++++++-----
.../PerQueryCPUMemAccountantFactory.java | 13 +++++------
.../accounting/ResourceUsageAccountantFactory.java | 21 +++++++++---------
.../core/query/reduce/GroupByDataTableReducer.java | 2 +-
.../accounting/PerQueryCPUMemAccountantTest.java | 21 +++++++++---------
.../core/accounting/TestResourceAccountant.java | 9 +++++---
.../runtime/executor/OpChainSchedulerService.java | 4 +++-
.../pinot/query/service/server/QueryServer.java | 3 ++-
.../runtime/operator/MultiStageAccountingTest.java | 5 ++---
.../MultistageResourceUsageAccountingTest.java | 15 ++++++-------
.../query/runtime/queries/QueryRunnerTestBase.java | 3 +--
.../spi/accounting/ThreadExecutionContext.java | 13 +++++++++++
.../spi/accounting/ThreadResourceTracker.java | 2 ++
.../accounting/ThreadResourceUsageAccountant.java | 7 ++++--
.../java/org/apache/pinot/spi/trace/Tracing.java | 24 +++++++++++++++++----
.../ThrottleOnCriticalHeapUsageExecutorTest.java | 6 ++++--
18 files changed, 120 insertions(+), 61 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index aa5f38f5ee8..199fbdf9d76 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -99,6 +99,7 @@ import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.query.parser.utils.ParserUtils;
import org.apache.pinot.segment.local.function.GroovyFunctionEvaluator;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.auth.AuthorizationResult;
import org.apache.pinot.spi.auth.TableRowColAccessResult;
@@ -325,7 +326,8 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
//Start instrumentation context. This must not be moved further below
interspersed into the code.
String workloadName =
QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions());
- _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(),
workloadName);
+ _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(),
ThreadExecutionContext.TaskType.SSE,
+ workloadName);
try {
return doHandleRequest(requestId, query, sqlNodeAndOptions, request,
requesterIdentity, requestContext,
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 601bdb0d9b7..a366fd07144 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -84,6 +84,7 @@ import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.auth.TableAuthorizationResult;
import org.apache.pinot.spi.auth.broker.RequesterIdentity;
@@ -533,7 +534,8 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
try {
String workloadName =
QueryOptionsUtils.getWorkloadName(query.getOptions());
- _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(),
workloadName);
+ _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(),
ThreadExecutionContext.TaskType.MSE,
+ workloadName);
long executionStartTimeNs = System.nanoTime();
QueryDispatcher.QueryResult queryResults;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
index 8a80e7d0f68..b6a0bc661a9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
@@ -108,8 +108,15 @@ public class CPUMemThreadLevelAccountingObjects {
return taskEntry == null ? -1 : taskEntry.getTaskId();
}
- public void setThreadTaskStatus(String queryId, int taskId, Thread
anchorThread, String workloadName) {
- _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId,
anchorThread, workloadName));
+ @Override
+ public ThreadExecutionContext.TaskType getTaskType() {
+ TaskEntry taskEntry = _currentThreadTaskStatus.get();
+ return taskEntry == null ? ThreadExecutionContext.TaskType.UNKNOWN :
taskEntry.getTaskType();
+ }
+
+ public void setThreadTaskStatus(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
+ Thread anchorThread, String workloadName) {
+ _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType,
anchorThread, workloadName));
_threadResourceSnapshot.reset();
}
@@ -136,6 +143,7 @@ public class CPUMemThreadLevelAccountingObjects {
private final String _queryId;
private final int _taskId;
private final Thread _anchorThread;
+ private final TaskType _taskType;
private final String _workloadName;
@@ -143,10 +151,11 @@ public class CPUMemThreadLevelAccountingObjects {
return _taskId == CommonConstants.Accounting.ANCHOR_TASK_ID;
}
- public TaskEntry(String queryId, int taskId, Thread anchorThread, String
workloadName) {
+ public TaskEntry(String queryId, int taskId, TaskType taskType, Thread
anchorThread, String workloadName) {
_queryId = queryId;
_taskId = taskId;
_anchorThread = anchorThread;
+ _taskType = taskType;
_workloadName = workloadName;
}
@@ -162,14 +171,20 @@ public class CPUMemThreadLevelAccountingObjects {
return _anchorThread;
}
+ @Override
+ public TaskType getTaskType() {
+ return _taskType;
+ }
+
+
public String getWorkloadName() {
return _workloadName;
}
@Override
public String toString() {
- return "TaskEntry{" + "_queryId='" + _queryId + '\'' + ", _taskId=" +
_taskId + ", _anchorThread=" + _anchorThread
- + ", _workloadName='" + _workloadName + '\'' + '}';
+ return "TaskEntry{" + "_queryId='" + _queryId + '\'' + ", _taskId=" +
_taskId + ", _rootThread=" + _anchorThread
+ + ", _taskType=" + _taskType + ", _workloadName=" + _workloadName +
'}';
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 1328e17e621..79f5d671966 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -341,22 +341,23 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
@Override
- public void setupRunner(@Nullable String queryId, String workloadName) {
+ public void setupRunner(@Nullable String queryId,
ThreadExecutionContext.TaskType taskType,
+ String workloadName) {
_threadLocalEntry.get()._errorStatus.set(null);
if (queryId != null) {
_threadLocalEntry.get()
- .setThreadTaskStatus(queryId,
CommonConstants.Accounting.ANCHOR_TASK_ID, Thread.currentThread(),
+ .setThreadTaskStatus(queryId,
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, Thread.currentThread(),
workloadName);
}
}
@Override
- public void setupWorker(int taskId, @Nullable ThreadExecutionContext
parentContext) {
+ public void setupWorker(int taskId, ThreadExecutionContext.TaskType
taskType,
+ @Nullable ThreadExecutionContext parentContext) {
_threadLocalEntry.get()._errorStatus.set(null);
if (parentContext != null && parentContext.getQueryId() != null &&
parentContext.getAnchorThread() != null) {
- _threadLocalEntry.get()
- .setThreadTaskStatus(parentContext.getQueryId(), taskId,
parentContext.getAnchorThread(),
- parentContext.getWorkloadName());
+
_threadLocalEntry.get().setThreadTaskStatus(parentContext.getQueryId(), taskId,
parentContext.getTaskType(),
+ parentContext.getAnchorThread(), parentContext.getWorkloadName());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
index 3252f994d53..b5f341a1188 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
@@ -141,22 +141,23 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
}
@Override
- public void setupRunner(@Nullable String queryId, String workloadName) {
- CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
_threadLocalEntry.get();
- threadEntry._errorStatus.set(null);
+ public void setupRunner(@Nullable String queryId,
ThreadExecutionContext.TaskType taskType, String workloadName) {
+ _threadLocalEntry.get()._errorStatus.set(null);
if (queryId != null) {
- threadEntry.setThreadTaskStatus(queryId,
CommonConstants.Accounting.ANCHOR_TASK_ID, Thread.currentThread(),
- workloadName);
+ _threadLocalEntry.get()
+ .setThreadTaskStatus(queryId,
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, Thread.currentThread(),
+ workloadName);
}
}
@Override
- public void setupWorker(int taskId, @Nullable ThreadExecutionContext
parentContext) {
- CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
_threadLocalEntry.get();
- threadEntry._errorStatus.set(null);
+ public void setupWorker(int taskId, ThreadExecutionContext.TaskType
taskType,
+ @Nullable ThreadExecutionContext parentContext) {
+ _threadLocalEntry.get()._errorStatus.set(null);
if (parentContext != null && parentContext.getQueryId() != null &&
parentContext.getAnchorThread() != null) {
- threadEntry.setThreadTaskStatus(parentContext.getQueryId(), taskId,
parentContext.getAnchorThread(),
- parentContext.getWorkloadName());
+ _threadLocalEntry.get()
+ .setThreadTaskStatus(parentContext.getQueryId(), taskId,
parentContext.getTaskType(),
+ parentContext.getAnchorThread(),
parentContext.getWorkloadName());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 76f8af85b38..d385f29dae1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -268,7 +268,7 @@ public class GroupByDataTableReducer implements
DataTableReducer {
futures[i] = reducerContext.getExecutorService().submit(new
TraceRunnable() {
@Override
public void runJob() {
- _resourceUsageAccountant.setupWorker(taskId, parentContext);
+ _resourceUsageAccountant.setupWorker(taskId,
ThreadExecutionContext.TaskType.SSE, parentContext);
try {
for (DataTable dataTable : reduceGroup) {
boolean nullHandlingEnabled =
_queryContext.isNullHandlingEnabled();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java
index 71192ddac33..5cefb3fc374 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.pinot.spi.accounting.QueryResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.Test;
@@ -70,8 +71,8 @@ public class PerQueryCPUMemAccountantTest {
// New Task
CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
workerEntry._threadEntry;
threadEntry._currentThreadTaskStatus.set(
- new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5,
anchorThread._workerThread,
- CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+ new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5,
ThreadExecutionContext.TaskType.SSE,
+ anchorThread._workerThread,
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
threadEntry._currentThreadMemoryAllocationSampleBytes = 1500;
Map<String, ? extends QueryResourceTracker> queryResourceTrackerMap =
accountant.getQueryResources();
@@ -100,8 +101,8 @@ public class PerQueryCPUMemAccountantTest {
// New Task
CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
workerEntry._threadEntry;
threadEntry._currentThreadTaskStatus.set(
- new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5,
anchorThread._workerThread,
- CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+ new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5,
ThreadExecutionContext.TaskType.SSE,
+ anchorThread._workerThread,
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
threadEntry.setToIdle();
Map<String, ? extends QueryResourceTracker> queryResourceTrackerMap =
accountant.getQueryResources();
@@ -135,8 +136,8 @@ public class PerQueryCPUMemAccountantTest {
// New Task
CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
workerEntry._threadEntry;
threadEntry._currentThreadTaskStatus.set(
- new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5,
anchorThread._workerThread,
- CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+ new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5,
ThreadExecutionContext.TaskType.SSE,
+ anchorThread._workerThread,
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
threadEntry._currentThreadMemoryAllocationSampleBytes = 1500;
accountant.reapFinishedTasks();
@@ -203,8 +204,8 @@ public class PerQueryCPUMemAccountantTest {
// New Task
CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
workerEntry._threadEntry;
threadEntry._currentThreadTaskStatus.set(
- new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5,
anchorThread._workerThread,
- CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+ new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5,
ThreadExecutionContext.TaskType.SSE,
+ anchorThread._workerThread,
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
threadEntry._currentThreadMemoryAllocationSampleBytes = 1500;
accountant.reapFinishedTasks();
@@ -249,8 +250,8 @@ public class PerQueryCPUMemAccountantTest {
// New Task
CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
workerEntry._threadEntry;
threadEntry._currentThreadTaskStatus.set(
- new CPUMemThreadLevelAccountingObjects.TaskEntry(newQueryId, 5,
anchorThread._workerThread,
- CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+ new CPUMemThreadLevelAccountingObjects.TaskEntry(newQueryId, 5,
ThreadExecutionContext.TaskType.SSE,
+ anchorThread._workerThread,
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
threadEntry._currentThreadMemoryAllocationSampleBytes = 3500;
accountant.reapFinishedTasks();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
index 2f2a29bb8b3..ba46611343a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.accounting;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -42,7 +43,8 @@ class TestResourceAccountant extends
PerQueryCPUMemAccountantFactory.PerQueryCPU
CPUMemThreadLevelAccountingObjects.ThreadEntry anchorEntry = new
CPUMemThreadLevelAccountingObjects.ThreadEntry();
anchorEntry._currentThreadTaskStatus.set(
new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId,
CommonConstants.Accounting.ANCHOR_TASK_ID,
- anchorThread._workerThread,
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+ ThreadExecutionContext.TaskType.SSE, anchorThread._workerThread,
+ CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
anchorEntry._currentThreadMemoryAllocationSampleBytes = 1000;
threadEntries.put(anchorThread._workerThread, anchorEntry);
@@ -57,8 +59,9 @@ class TestResourceAccountant extends
PerQueryCPUMemAccountantFactory.PerQueryCPU
private static TaskThread getTaskThread(String queryId, int taskId,
CountDownLatch threadLatch, Thread anchorThread) {
CPUMemThreadLevelAccountingObjects.ThreadEntry worker1 = new
CPUMemThreadLevelAccountingObjects.ThreadEntry();
- worker1._currentThreadTaskStatus.set(new
CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, taskId, anchorThread,
- CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+ worker1._currentThreadTaskStatus.set(
+ new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, taskId,
ThreadExecutionContext.TaskType.SSE,
+ anchorThread, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
Thread workerThread1 = new Thread(() -> {
try {
threadLatch.await();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index d7a0ad23503..18d34f35c67 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -35,6 +35,7 @@ import
org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.operator.OpChainId;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
@@ -82,7 +83,8 @@ public class OpChainSchedulerService {
// try-with-resources to ensure that the operator chain is closed
// TODO: Change the code so we ownership is expressed in the code in a
better way
try (OpChain closeMe = operatorChain) {
-
Tracing.ThreadAccountantOps.setupWorker(operatorChain.getId().getStageId(),
operatorChain.getParentContext());
+
Tracing.ThreadAccountantOps.setupWorker(operatorChain.getId().getStageId(),
+ ThreadExecutionContext.TaskType.MSE,
operatorChain.getParentContext());
LOGGER.trace("({}): Executing", operatorChain);
MseBlock result = operatorChain.getRoot().nextBlock();
while (result.isData()) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index b8fbe6f4145..a3ab6471c93 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -287,7 +287,8 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
String workloadName = QueryOptionsUtils.getWorkloadName(reqMetadata);
//TODO: Verify if this matches with what OOM protection expects. This
method will not block for the query to
// finish, so it may be breaking some of the OOM protection assumptions.
- Tracing.ThreadAccountantOps.setupRunner(QueryThreadContext.getCid(),
workloadName);
+ Tracing.ThreadAccountantOps.setupRunner(QueryThreadContext.getCid(),
ThreadExecutionContext.TaskType.MSE,
+ workloadName);
ThreadExecutionContext parentContext =
Tracing.getThreadAccountant().getThreadExecutionContext();
try {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
index c99a684e2d2..a1ee88e56a9 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
@@ -98,10 +98,9 @@ public class MultiStageAccountingTest implements ITest {
Tracing.ThreadAccountantOps.startThreadAccountant();
// Setup Thread Context
- Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest",
- CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME);
+ Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest",
ThreadExecutionContext.TaskType.MSE, null);
ThreadExecutionContext threadExecutionContext =
Tracing.getThreadAccountant().getThreadExecutionContext();
- Tracing.ThreadAccountantOps.setupWorker(1, threadExecutionContext);
+ Tracing.ThreadAccountantOps.setupWorker(1,
ThreadExecutionContext.TaskType.MSE, threadExecutionContext);
}
@BeforeMethod
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
index b5b81999178..dd17202f644 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
@@ -57,12 +57,10 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
-import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.DOUBLE;
-import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
-import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.openMocks;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.*;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.*;
+import static org.testng.Assert.*;
public class MultistageResourceUsageAccountingTest implements ITest {
@@ -99,10 +97,9 @@ public class MultistageResourceUsageAccountingTest
implements ITest {
Tracing.ThreadAccountantOps.startThreadAccountant();
// Setup Thread Context
- Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest",
- CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME);
+ Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest",
ThreadExecutionContext.TaskType.MSE, null);
ThreadExecutionContext threadExecutionContext =
Tracing.getThreadAccountant().getThreadExecutionContext();
- Tracing.ThreadAccountantOps.setupWorker(1, threadExecutionContext);
+ Tracing.ThreadAccountantOps.setupWorker(1,
ThreadExecutionContext.TaskType.MSE, threadExecutionContext);
}
@BeforeMethod
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index 58a5ad886b9..dfb88fe10c8 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -177,8 +177,7 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
for (Map.Entry<QueryServerInstance, List<Integer>> entry :
dispatchableStagePlan.getServerInstanceToWorkerIdMap()
.entrySet()) {
QueryServerEnclosure serverEnclosure = _servers.get(entry.getKey());
- Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId),
- CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME);
+ Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId),
ThreadExecutionContext.TaskType.MSE, null);
ThreadExecutionContext parentContext =
Tracing.getThreadAccountant().getThreadExecutionContext();
List<WorkerMetadata> workerMetadataList =
entry.getValue().stream().map(stageWorkerMetadataList::get).collect(Collectors.toList());
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
index 63c900fac03..9aa18b5b230 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
@@ -23,6 +23,17 @@ package org.apache.pinot.spi.accounting;
*/
public interface ThreadExecutionContext {
+ /**
+ * SSE: Single Stage Engine
+ * MSE: Multi Stage Engine
+ * UNKNOWN: Default
+ */
+ enum TaskType {
+ SSE,
+ MSE,
+ UNKNOWN
+ }
+
/**
* get query id of the execution context
* @return query id in string
@@ -35,5 +46,7 @@ public interface ThreadExecutionContext {
*/
Thread getAnchorThread();
+ TaskType getTaskType();
+
String getWorkloadName();
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
index 3d169bf6497..d084bf40d41 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
@@ -51,4 +51,6 @@ public interface ThreadResourceTracker {
* @return an int containing the task id.
*/
int getTaskId();
+
+ ThreadExecutionContext.TaskType getTaskType();
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index 49301ee6994..ea1a7bf3f5d 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -49,16 +49,19 @@ public interface ThreadResourceUsageAccountant {
/**
* Set up the thread execution context for an anchor a.k.a runner thread.
* @param queryId query id string
+ * @param taskType the type of the task - SSE or MSE
* @param workloadName the name of the workload, can be null
*/
- void setupRunner(@Nullable String queryId, String workloadName);
+ void setupRunner(@Nullable String queryId, ThreadExecutionContext.TaskType
taskType, String workloadName);
/**
* Set up the thread execution context for a worker thread.
* @param taskId a unique task id
+ * @param taskType the type of the task - SSE or MSE
* @param parentContext the parent execution context
*/
- void setupWorker(int taskId, @Nullable ThreadExecutionContext parentContext);
+ void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType,
+ @Nullable ThreadExecutionContext parentContext);
/**
* get the executon context of current thread
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 3b7561ac23c..5501d180517 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -213,11 +213,12 @@ public class Tracing {
}
@Override
- public void setupRunner(@Nullable String queryId, String workloadName) {
+ public void setupRunner(@Nullable String queryId,
ThreadExecutionContext.TaskType taskType, String workloadName) {
}
@Override
- public void setupWorker(int taskId, @Nullable ThreadExecutionContext
parentContext) {
+ public void setupWorker(int taskId, ThreadExecutionContext.TaskType
taskType,
+ @Nullable ThreadExecutionContext parentContext) {
}
@Override
@@ -258,7 +259,12 @@ public class Tracing {
}
public static void setupRunner(String queryId, String workloadName) {
- Tracing.getThreadAccountant().setupRunner(queryId, workloadName);
+ setupRunner(queryId, ThreadExecutionContext.TaskType.SSE, workloadName);
+ }
+
+ public static void setupRunner(String queryId,
ThreadExecutionContext.TaskType taskType, String workloadName) {
+ // Set up the runner thread with the given query ID and workload name
+ Tracing.getThreadAccountant().setupRunner(queryId, taskType,
workloadName);
}
/**
@@ -267,7 +273,17 @@ public class Tracing {
* @param threadExecutionContext Context holds metadata about the query.
*/
public static void setupWorker(int taskId, ThreadExecutionContext
threadExecutionContext) {
- Tracing.getThreadAccountant().setupWorker(taskId,
threadExecutionContext);
+ setupWorker(taskId, ThreadExecutionContext.TaskType.SSE,
threadExecutionContext);
+ }
+
+ /**
+ * Setup metadata of query worker threads.
+ * @param taskId Query task ID of the thread. In SSE, ID is an
incrementing counter. In MSE, id is the stage id.
+ * @param threadExecutionContext Context holds metadata about the query.
+ */
+ public static void setupWorker(int taskId, ThreadExecutionContext.TaskType
taskType,
+ @Nullable ThreadExecutionContext threadExecutionContext) {
+ Tracing.getThreadAccountant().setupWorker(taskId, taskType,
threadExecutionContext);
}
public static void sample() {
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
index 3020cf4b21b..7266a4a7a9b 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
@@ -56,11 +56,13 @@ public class ThrottleOnCriticalHeapUsageExecutorTest {
}
@Override
- public void setupRunner(@Nullable String queryId, String workloadName) {
+ public void setupRunner(@Nullable String queryId,
ThreadExecutionContext.TaskType taskType,
+ String workloadName) {
}
@Override
- public void setupWorker(int taskId, @Nullable ThreadExecutionContext
parentContext) {
+ public void setupWorker(int taskId, ThreadExecutionContext.TaskType
taskType,
+ @Nullable ThreadExecutionContext parentContext) {
}
@Nullable
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]