This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 633c90d43be Fix LeafOperator thread accountant (#16651)
633c90d43be is described below
commit 633c90d43be4c52db784ec7ed0574b9dd3e5fa41
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Aug 22 11:10:40 2025 -0700
Fix LeafOperator thread accountant (#16651)
---
.../pinot/query/runtime/operator/LeafOperator.java | 135 ++++++++++-----------
1 file changed, 61 insertions(+), 74 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 157bcf689e6..2d0939c85e8 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -93,6 +93,7 @@ public class LeafOperator extends MultiStageOperator {
private final String _tableName;
private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
private final AtomicReference<ErrorMseBlock> _errorBlock = new
AtomicReference<>();
+ private final ResultsBlockStreamer _resultsBlockStreamer =
this::addResultsBlock;
// Use a limit-sized BlockingQueue to store the results blocks and apply
back pressure to the single-stage threads
@VisibleForTesting
@@ -422,42 +423,19 @@ public class LeafOperator extends MultiStageOperator {
}
@VisibleForTesting
- void execute(ThreadExecutionContext parentContext) {
- ResultsBlockConsumer resultsBlockConsumer = new ResultsBlockConsumer();
- ServerQueryLogger queryLogger = ServerQueryLogger.getInstance();
+ void execute(@Nullable ThreadExecutionContext parentContext) {
if (_requests.size() == 1) {
ServerQueryRequest request = _requests.get(0);
- Tracing.ThreadAccountantOps.setupWorker(1, parentContext);
-
- InstanceResponseBlock instanceResponseBlock =
- _queryExecutor.execute(request, _executorService,
resultsBlockConsumer);
- if (queryLogger != null) {
- queryLogger.logQuery(request, instanceResponseBlock,
"MultistageEngine");
- }
- // Collect the execution stats
- mergeExecutionStats(instanceResponseBlock.getResponseMetadata());
- // TODO: Revisit if we should treat all exceptions as query failure.
Currently MERGE_RESPONSE_ERROR and
- // SERVER_SEGMENT_MISSING_ERROR are counted as query failure.
- Map<Integer, String> exceptions = instanceResponseBlock.getExceptions();
- if (!exceptions.isEmpty()) {
-
setErrorBlock(ErrorMseBlock.fromMap(QueryErrorCode.fromKeyMap(exceptions)));
- } else {
- // NOTE: Instance response block might contain data (not metadata
only) when all the segments are pruned.
- // Add the results block if it contains data.
- BaseResultsBlock resultsBlock =
instanceResponseBlock.getResultsBlock();
- if (resultsBlock != null && resultsBlock.getNumRows() > 0) {
- try {
- addResultsBlock(resultsBlock);
- } catch (InterruptedException e) {
- setErrorBlock(CANCELLED_BLOCK);
- } catch (TimeoutException e) {
- setErrorBlock(TIMEOUT_BLOCK);
- } catch (Exception e) {
- if (!(e instanceof EarlyTerminationException)) {
- LOGGER.warn("Failed to add results block", e);
- }
- }
+ if (parentContext != null) {
+ // NOTE: Treat this as SSE runner (anchor) thread.
+ Tracing.ThreadAccountantOps.setupRunner(parentContext.getQueryId(),
parentContext.getWorkloadName());
+ try {
+ executeOneRequest(request, null);
+ } finally {
+ Tracing.ThreadAccountantOps.clear();
}
+ } else {
+ executeOneRequest(request, null);
}
} else {
// Hit 2 physical tables, one REALTIME and one OFFLINE
@@ -469,43 +447,24 @@ public class LeafOperator extends MultiStageOperator {
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
ServerQueryRequest request = _requests.get(i);
- int taskId = i;
futures[i] = _executorService.submit(() -> {
- Tracing.ThreadAccountantOps.setupWorker(taskId, parentContext);
-
- try {
- InstanceResponseBlock instanceResponseBlock =
- _queryExecutor.execute(request, _executorService,
resultsBlockConsumer);
- if (queryLogger != null) {
- queryLogger.logQuery(request, instanceResponseBlock,
"MultistageEngine");
+ if (parentContext != null) {
+ // NOTE: Treat this as SSE runner (anchor) thread.
+
Tracing.ThreadAccountantOps.setupRunner(parentContext.getQueryId(),
parentContext.getWorkloadName());
+ try {
+ // Drain the latch when receiving exception block and not wait
for the other thread to finish
+ executeOneRequest(request, latch::countDown);
+ } finally {
+ Tracing.ThreadAccountantOps.clear();
+ latch.countDown();
}
- // Collect the execution stats
- mergeExecutionStats(instanceResponseBlock.getResponseMetadata());
- Map<Integer, String> exceptions =
instanceResponseBlock.getExceptions();
- if (!exceptions.isEmpty()) {
-
setErrorBlock(ErrorMseBlock.fromMap(QueryErrorCode.fromKeyMap(exceptions)));
+ } else {
+ try {
// Drain the latch when receiving exception block and not wait
for the other thread to finish
+ executeOneRequest(request, latch::countDown);
+ } finally {
latch.countDown();
- } else {
- // NOTE: Instance response block might contain data (not
metadata only) when all the segments are
- // pruned. Add the results block if it contains data.
- BaseResultsBlock resultsBlock =
instanceResponseBlock.getResultsBlock();
- if (resultsBlock != null && resultsBlock.getNumRows() > 0) {
- try {
- addResultsBlock(resultsBlock);
- } catch (InterruptedException e) {
- setErrorBlock(CANCELLED_BLOCK);
- } catch (TimeoutException e) {
- setErrorBlock(TIMEOUT_BLOCK);
- } catch (Exception e) {
- if (!(e instanceof EarlyTerminationException)) {
- LOGGER.warn("Failed to add results block", e);
- }
- }
- }
}
- } finally {
- latch.countDown();
}
});
}
@@ -523,6 +482,43 @@ public class LeafOperator extends MultiStageOperator {
}
}
+ private void executeOneRequest(ServerQueryRequest request, @Nullable
Runnable onException) {
+ InstanceResponseBlock instanceResponseBlock =
+ _queryExecutor.execute(request, _executorService,
_resultsBlockStreamer);
+ ServerQueryLogger queryLogger = ServerQueryLogger.getInstance();
+ if (queryLogger != null) {
+ queryLogger.logQuery(request, instanceResponseBlock, "MultistageEngine");
+ }
+ // Collect the execution stats
+ mergeExecutionStats(instanceResponseBlock.getResponseMetadata());
+ // TODO: Revisit if we should treat all exceptions as query failure.
Currently MERGE_RESPONSE_ERROR and
+ // SERVER_SEGMENT_MISSING_ERROR are counted as query failure.
+ Map<Integer, String> exceptions = instanceResponseBlock.getExceptions();
+ if (!exceptions.isEmpty()) {
+
setErrorBlock(ErrorMseBlock.fromMap(QueryErrorCode.fromKeyMap(exceptions)));
+ if (onException != null) {
+ onException.run();
+ }
+ } else {
+ // NOTE: Instance response block might contain data (not metadata only)
when all the segments are pruned.
+ // Add the results block if it contains data.
+ BaseResultsBlock resultsBlock = instanceResponseBlock.getResultsBlock();
+ if (resultsBlock != null && resultsBlock.getNumRows() > 0) {
+ try {
+ addResultsBlock(resultsBlock);
+ } catch (InterruptedException e) {
+ setErrorBlock(CANCELLED_BLOCK);
+ } catch (TimeoutException e) {
+ setErrorBlock(TIMEOUT_BLOCK);
+ } catch (Exception e) {
+ if (!(e instanceof EarlyTerminationException)) {
+ LOGGER.warn("Failed to add results block", e);
+ }
+ }
+ }
+ }
+ }
+
@VisibleForTesting
void addResultsBlock(BaseResultsBlock resultsBlock)
throws InterruptedException, TimeoutException {
@@ -670,15 +666,6 @@ public class LeafOperator extends MultiStageOperator {
}
}
- private class ResultsBlockConsumer implements ResultsBlockStreamer {
-
- @Override
- public void send(BaseResultsBlock block)
- throws InterruptedException, TimeoutException {
- addResultsBlock(block);
- }
- }
-
public enum StatKey implements StatMap.Key {
TABLE(StatMap.Type.STRING, null),
EXECUTION_TIME_MS(StatMap.Type.LONG, null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]