Jackie-Jiang commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1293157495
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -118,46 +118,52 @@ public String toExplainString() {
@Override
protected TransferableBlock getNextBlock() {
- boolean canContinue = true;
TransferableBlock transferableBlock;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("==[SEND]== Enter getNextBlock from: " + _context.getId());
+ }
try {
transferableBlock = _sourceOperator.nextBlock();
- if (transferableBlock.isNoOpBlock()) {
- return transferableBlock;
- } else if (transferableBlock.isEndOfStreamBlock()) {
- if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
- // Stats need to be populated here because the block is being sent
to the mailbox
- // and the receiving opChain will not be able to access the stats
from the previous opChain
- TransferableBlock eosBlockWithStats =
TransferableBlockUtils.getEndOfStreamTransferableBlock(
-
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
- sendTransferableBlock(eosBlockWithStats);
- } else {
- sendTransferableBlock(transferableBlock);
- }
- } else { // normal blocks
- // check whether we should continue depending on exchange queue
condition.
- canContinue = sendTransferableBlock(transferableBlock);
+ if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
+ // Stats need to be populated here because the block is being sent to
the mailbox
+ // and the receiving opChain will not be able to access the stats from
the previous opChain
+ TransferableBlock eosBlockWithStats =
TransferableBlockUtils.getEndOfStreamTransferableBlock(
+
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
+ sendTransferableBlock(eosBlockWithStats, false);
+ } else {
+ sendTransferableBlock(transferableBlock, true);
}
} catch (Exception e) {
transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
try {
LOGGER.error("Exception while transferring data on opChain: " +
_context.getId(), e);
- sendTransferableBlock(transferableBlock);
+ sendTransferableBlock(transferableBlock, false);
} catch (Exception e2) {
LOGGER.error("Exception while sending error block.", e2);
}
}
- // yield if we cannot continue to put transferable block into the sending
queue
- return canContinue ? transferableBlock :
TransferableBlockUtils.getNoOpTransferableBlock();
+ return transferableBlock;
}
- private boolean sendTransferableBlock(TransferableBlock block)
+ private void sendTransferableBlock(TransferableBlock block, boolean
throwIfTimeout)
Review Comment:
IIUC, if it times out, we want to early terminate the operator, so throwing
exception is the fastest option. Can you confirm @walterddr
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]