This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9d3a671300d Improve broker error messaging when broker is the one
reporting the failure (#16076)
9d3a671300d is described below
commit 9d3a671300d9fae776fb35be0f8b6500827d6c62
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Jun 17 09:05:47 2025 +0200
Improve broker error messaging when broker is the one reporting the failure
(#16076)
---
.../core/query/reduce/GroupByDataTableReducer.java | 2 +-
.../apache/pinot/query/mailbox/ReceivingMailbox.java | 18 ++++++++++++------
.../pinot/query/runtime/blocks/ErrorMseBlock.java | 4 +---
.../pinot/query/service/dispatch/QueryDispatcher.java | 17 +++++++++++------
4 files changed, 25 insertions(+), 16 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 1d83f7190da..2072e937deb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -356,7 +356,7 @@ public class GroupByDataTableReducer implements
DataTableReducer {
try {
long timeOutMs = reducerContext.getReduceTimeOutMs() -
(System.currentTimeMillis() - start);
if (!countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS)) {
- throw new TimeoutException("Timed out in broker reduce phase");
+ throw new TimeoutException("Timed out on broker reduce phase");
}
Throwable t = exception.get();
if (t != null) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index 4e9736d4f5b..6588e1cf76a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -61,8 +61,8 @@ public class ReceivingMailbox {
public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
private static final Logger LOGGER =
LoggerFactory.getLogger(ReceivingMailbox.class);
- private static final MseBlockWithStats CANCELLED_ERROR_BLOCK = new
MseBlockWithStats(
- ErrorMseBlock.fromException(new RuntimeException("Cancelled by
receiver")), Collections.emptyList());
+ // This was previously a static final attribute, but now that includes
server and stage, we cannot use constants
+ private volatile MseBlockWithStats _cancelledErrorBlock;
private final String _id;
// TODO: Make the queue size configurable
@@ -152,7 +152,7 @@ public class ReceivingMailbox {
MseBlockWithStats errorBlock = _errorBlock.get();
if (errorBlock != null) {
LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring
the late block", _id);
- return errorBlock == CANCELLED_ERROR_BLOCK ?
ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
+ return errorBlock == _cancelledErrorBlock ?
ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
}
if (timeoutMs <= 0) {
LOGGER.debug("Mailbox: {} is already timed out", _id);
@@ -177,7 +177,7 @@ public class ReceivingMailbox {
} else {
LOGGER.debug("Mailbox: {} is already cancelled or errored out,
ignoring the late block", _id);
_blocks.clear();
- return errorBlock == CANCELLED_ERROR_BLOCK ?
ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
+ return errorBlock == _cancelledErrorBlock ?
ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
}
} else {
LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms",
_id, timeoutMs);
@@ -233,8 +233,14 @@ public class ReceivingMailbox {
*/
public void cancel() {
LOGGER.debug("Cancelling mailbox: {}", _id);
- if (_errorBlock.compareAndSet(null, CANCELLED_ERROR_BLOCK)) {
- _blocks.clear();
+ if (_errorBlock.get() == null) {
+ MseBlockWithStats errorBlock = new MseBlockWithStats(
+ ErrorMseBlock.fromError(QueryErrorCode.EXECUTION_TIMEOUT, "Cancelled
by receiver"),
+ Collections.emptyList());
+ if (_errorBlock.compareAndSet(null, errorBlock)) {
+ _cancelledErrorBlock = errorBlock;
+ _blocks.clear();
+ }
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ErrorMseBlock.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ErrorMseBlock.java
index e07dad85819..6ae8b15e1d8 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ErrorMseBlock.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ErrorMseBlock.java
@@ -61,15 +61,13 @@ public class ErrorMseBlock implements MseBlock.Eos {
public static ErrorMseBlock fromMap(Map<QueryErrorCode, String>
errorMessages) {
int stage;
int worker;
- String server;
+ String server = QueryThreadContext.isInitialized() ?
QueryThreadContext.getInstanceId() : "unknown";
if (MseWorkerThreadContext.isInitialized()) {
stage = MseWorkerThreadContext.getStageId();
worker = MseWorkerThreadContext.getWorkerId();
- server = QueryThreadContext.getInstanceId();
} else {
stage = -1; // Default value when not initialized
worker = -1; // Default value when not initialized
- server = null; // Default value when not initialized
}
return new ErrorMseBlock(stage, worker, server, errorMessages);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index c0c31e034af..d7ef6790d25 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -64,6 +64,7 @@ import
org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.util.DataBlockExtractUtils;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
+import org.apache.pinot.query.MseWorkerThreadContext;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
@@ -614,7 +615,11 @@ public class QueryDispatcher {
ArrayList<Object[]> resultRows = new ArrayList<>();
MseBlock block;
MultiStageQueryStats queryStats;
- try (OpChain opChain = PlanNodeToOpChain.convert(rootNode,
executionContext, (a, b) -> { })) {
+ try (
+ QueryThreadContext.CloseableContext mseCloseableCtx =
MseWorkerThreadContext.open();
+ OpChain opChain = PlanNodeToOpChain.convert(rootNode,
executionContext, (a, b) -> { })) {
+ MseWorkerThreadContext.setStageId(0);
+ MseWorkerThreadContext.setWorkerId(0);
MultiStageOperator rootOperator = opChain.getRoot();
block = rootOperator.nextBlock();
while (block.isData()) {
@@ -648,21 +653,21 @@ public class QueryDispatcher {
Map.Entry<QueryErrorCode, String> error;
String from;
if (errorBlock.getStageId() >= 0) {
- from = "from stage " + errorBlock.getStageId();
+ from = " from stage " + errorBlock.getStageId();
if (errorBlock.getServerId() != null) {
- from += " on server " + errorBlock.getServerId();
+ from += " on " + errorBlock.getServerId();
}
} else {
- from = "from servers";
+ from = "";
}
if (queryExceptions.size() == 1) {
error = queryExceptions.entrySet().iterator().next();
- errorMessage = "Received 1 error " + from + ": " + error.getValue();
+ errorMessage = "Received 1 error" + from + ": " + error.getValue();
} else {
error = queryExceptions.entrySet().stream()
.max(QueryDispatcher::compareErrors)
.orElseThrow();
- errorMessage = "Received " + queryExceptions.size() + " errors " +
from + ". "
+ errorMessage = "Received " + queryExceptions.size() + " errors" + from
+ ". "
+ "The one with highest priority is: " + error.getValue();
}
QueryProcessingException processingEx = new
QueryProcessingException(error.getKey().getId(), errorMessage);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]