This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b0c360aae1 [multistage] Early terminate SortOperator if there is a
limit (#11334)
b0c360aae1 is described below
commit b0c360aae186cce8940ce54fffe4532adc7fccfb
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Aug 17 00:41:12 2023 -0700
[multistage] Early terminate SortOperator if there is a limit (#11334)
---
.../src/test/resources/queries/JoinPlans.json | 19 ++++++++
.../pinot/query/mailbox/GrpcSendingMailbox.java | 46 +++++++++++--------
.../query/mailbox/InMemorySendingMailbox.java | 28 +++++++++++-
.../pinot/query/mailbox/ReceivingMailbox.java | 28 +++++++-----
.../apache/pinot/query/mailbox/SendingMailbox.java | 6 +++
.../mailbox/channel/MailboxContentObserver.java | 28 +++++++++---
.../query/runtime/blocks/TransferableBlock.java | 7 ---
.../runtime/operator/MailboxSendOperator.java | 51 ++++++++++++----------
.../pinot/query/runtime/operator/SortOperator.java | 15 ++++---
.../runtime/operator/exchange/BlockExchange.java | 15 ++++++-
.../pinot/query/mailbox/MailboxServiceTest.java | 16 ++++---
11 files changed, 178 insertions(+), 81 deletions(-)
diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
index 94a53aec08..cf8537c16c 100644
--- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
@@ -402,6 +402,25 @@
"\n LogicalTableScan(table=[[b]])",
"\n"
]
+ },
+ {
+ "description": "Inner join with limit",
+ "sql": "EXPLAIN PLAN FOR SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON
a.col1 = b.col2 LIMIT 100",
+ "output": [
+ "Execution Plan",
+ "\nLogicalSort(offset=[0], fetch=[100])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]],
isSortOnSender=[false], isSortOnReceiver=[false])",
+ "\n LogicalSort(fetch=[100])",
+ "\n LogicalProject(col1=[$0], ts=[$1], col3=[$3])",
+ "\n LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalProject(col1=[$0], ts=[$6])",
+ "\n LogicalTableScan(table=[[a]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalProject(col2=[$1], col3=[$2])",
+ "\n LogicalTableScan(table=[[b]])",
+ "\n"
+ ]
}
]
},
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 25cc337bf8..47b76b7e12 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.query.mailbox;
-import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.stub.StreamObserver;
@@ -61,35 +60,47 @@ public class GrpcSendingMailbox implements SendingMailbox {
@Override
public void send(TransferableBlock block)
throws IOException {
+ if (isTerminated()) {
+ return;
+ }
if (_contentObserver == null) {
_contentObserver = getContentObserver();
}
- Preconditions.checkState(!_statusObserver.isFinished(), "Mailbox: %s is
already closed", _id);
_contentObserver.onNext(toMailboxContent(block));
}
@Override
public void complete() {
+ if (isTerminated()) {
+ return;
+ }
_contentObserver.onCompleted();
}
@Override
public void cancel(Throwable t) {
- if (!_statusObserver.isFinished()) {
- LOGGER.debug("Cancelling mailbox: {}", _id);
- if (_contentObserver == null) {
- _contentObserver = getContentObserver();
- }
- try {
- // NOTE: DO NOT use onError() because it will terminate the stream,
and receiver might not get the callback
-
_contentObserver.onNext(toMailboxContent(TransferableBlockUtils.getErrorTransferableBlock(
- new RuntimeException("Cancelled by sender with exception: " +
t.getMessage(), t))));
- _contentObserver.onCompleted();
- } catch (Exception e) {
- // Exception can be thrown if the stream is already closed, so we
simply ignore it
- LOGGER.debug("Caught exception cancelling mailbox: {}", _id, e);
- }
+ if (isTerminated()) {
+ return;
+ }
+ LOGGER.debug("Cancelling mailbox: {}", _id);
+ if (_contentObserver == null) {
+ _contentObserver = getContentObserver();
}
+ try {
+ // NOTE: DO NOT use onError() because it will terminate the stream, and
receiver might not get the callback
+
_contentObserver.onNext(toMailboxContent(TransferableBlockUtils.getErrorTransferableBlock(
+ new RuntimeException("Cancelled by sender with exception: " +
t.getMessage(), t))));
+ _contentObserver.onCompleted();
+ } catch (Exception e) {
+ // Exception can be thrown if the stream is already closed, so we simply
ignore it
+ LOGGER.debug("Caught exception cancelling mailbox: {}", _id, e);
+ }
+ }
+
+ @Override
+ public boolean isTerminated() {
+ // TODO: We cannot differentiate early termination vs stream error
+ return _statusObserver.isFinished();
}
private StreamObserver<MailboxContent> getContentObserver() {
@@ -102,7 +113,6 @@ public class GrpcSendingMailbox implements SendingMailbox {
DataBlock dataBlock = block.getDataBlock();
byte[] bytes = dataBlock.toBytes();
ByteString byteString = UnsafeByteOperations.unsafeWrap(bytes);
- return MailboxContent.newBuilder().setMailboxId(_id).setPayload(byteString)
- .build();
+ return
MailboxContent.newBuilder().setMailboxId(_id).setPayload(byteString).build();
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index fb96d62043..23943a2091 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -32,6 +32,7 @@ public class InMemorySendingMailbox implements SendingMailbox
{
private final long _deadlineMs;
private ReceivingMailbox _receivingMailbox;
+ private volatile boolean _isTerminated;
public InMemorySendingMailbox(String id, MailboxService mailboxService, long
deadlineMs) {
_id = id;
@@ -41,12 +42,27 @@ public class InMemorySendingMailbox implements
SendingMailbox {
@Override
public void send(TransferableBlock block) {
+ if (_isTerminated) {
+ return;
+ }
if (_receivingMailbox == null) {
_receivingMailbox = _mailboxService.getReceivingMailbox(_id);
}
long timeoutMs = _deadlineMs - System.currentTimeMillis();
- if (!_receivingMailbox.offer(block, timeoutMs)) {
- throw new RuntimeException(String.format("Failed to offer block into
mailbox: %s within: %dms", _id, timeoutMs));
+ ReceivingMailbox.ReceivingMailboxStatus status =
_receivingMailbox.offer(block, timeoutMs);
+ switch (status) {
+ case SUCCESS:
+ break;
+ case ERROR:
+ throw new RuntimeException(String.format("Mailbox: %s already errored
out (received error block before)", _id));
+ case TIMEOUT:
+ throw new RuntimeException(
+ String.format("Timed out adding block into mailbox: %s with
timeout: %dms", _id, timeoutMs));
+ case EARLY_TERMINATED:
+ _isTerminated = true;
+ break;
+ default:
+ throw new IllegalStateException("Unsupported mailbox status: " +
status);
}
}
@@ -56,6 +72,9 @@ public class InMemorySendingMailbox implements SendingMailbox
{
@Override
public void cancel(Throwable t) {
+ if (_isTerminated) {
+ return;
+ }
LOGGER.debug("Cancelling mailbox: {}", _id);
if (_receivingMailbox == null) {
_receivingMailbox = _mailboxService.getReceivingMailbox(_id);
@@ -63,4 +82,9 @@ public class InMemorySendingMailbox implements SendingMailbox
{
_receivingMailbox.setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
new RuntimeException("Cancelled by sender with exception: " +
t.getMessage(), t)));
}
+
+ @Override
+ public boolean isTerminated() {
+ return _isTerminated;
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index fcba7c0a3d..64b7c3f202 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -65,37 +65,41 @@ public class ReceivingMailbox {
* Offers a non-error block into the mailbox within the timeout specified,
returns whether the block is successfully
* added. If the block is not added, an error block is added to the mailbox.
*/
- public boolean offer(TransferableBlock block, long timeoutMs) {
- if (_errorBlock.get() != null) {
+ public ReceivingMailboxStatus offer(TransferableBlock block, long timeoutMs)
{
+ TransferableBlock errorBlock = _errorBlock.get();
+ if (errorBlock != null) {
LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring
the late block", _id);
- return false;
+ return errorBlock == CANCELLED_ERROR_BLOCK ?
ReceivingMailboxStatus.EARLY_TERMINATED
+ : ReceivingMailboxStatus.ERROR;
}
- if (timeoutMs < 0) {
+ if (timeoutMs <= 0) {
LOGGER.debug("Mailbox: {} is already timed out", _id);
setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
new TimeoutException("Timed out while offering data to mailbox: " +
_id)));
- return false;
+ return ReceivingMailboxStatus.TIMEOUT;
}
try {
if (_blocks.offer(block, timeoutMs, TimeUnit.MILLISECONDS)) {
- if (_errorBlock.get() == null) {
+ errorBlock = _errorBlock.get();
+ if (errorBlock == null) {
_receiveMailCallback.accept(MailboxIdUtils.toOpChainId(_id));
- return true;
+ return ReceivingMailboxStatus.SUCCESS;
} else {
LOGGER.debug("Mailbox: {} is already cancelled or errored out,
ignoring the late block", _id);
_blocks.clear();
- return false;
+ return errorBlock == CANCELLED_ERROR_BLOCK ?
ReceivingMailboxStatus.EARLY_TERMINATED
+ : ReceivingMailboxStatus.ERROR;
}
} else {
LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms",
_id, timeoutMs);
setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
new TimeoutException("Timed out while waiting for receive operator
to consume data from mailbox: " + _id)));
- return false;
+ return ReceivingMailboxStatus.TIMEOUT;
}
} catch (InterruptedException e) {
LOGGER.error("Interrupted while offering block into mailbox: {}", _id);
setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(e));
- return false;
+ return ReceivingMailboxStatus.ERROR;
}
}
@@ -133,4 +137,8 @@ public class ReceivingMailbox {
public int getNumPendingBlocks() {
return _blocks.size();
}
+
+ public enum ReceivingMailboxStatus {
+ SUCCESS, ERROR, TIMEOUT, EARLY_TERMINATED
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 3a794260b7..68b4f958cd 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -55,4 +55,10 @@ public interface SendingMailbox {
* No more blocks can be sent after calling this method.
*/
void cancel(Throwable t);
+
+ /**
+ * Returns whether the {@link ReceivingMailbox} is already closed. There is
no need to send more blocks after the
+ * mailbox is terminated.
+ */
+ boolean isTerminated();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
index d9509d5766..9074d81151 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
@@ -77,13 +77,27 @@ public class MailboxContentObserver implements
StreamObserver<MailboxContent> {
}
long timeoutMs =
Context.current().getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
- if (_mailbox.offer(block, timeoutMs)) {
-
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
- .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
- Integer.toString(_mailbox.getNumPendingBlocks())).build());
- } else {
- LOGGER.warn("Failed to add block into mailbox: {} within timeout:
{}ms", mailboxId, timeoutMs);
- cancelStream();
+ ReceivingMailbox.ReceivingMailboxStatus status = _mailbox.offer(block,
timeoutMs);
+ switch (status) {
+ case SUCCESS:
+
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
+ .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
+ Integer.toString(_mailbox.getNumPendingBlocks())).build());
+ break;
+ case ERROR:
+ LOGGER.warn("Mailbox: {} already errored out (received error block
before)", mailboxId);
+ cancelStream();
+ break;
+ case TIMEOUT:
+ LOGGER.warn("Timed out adding block into mailbox: {} with timeout:
{}ms", mailboxId, timeoutMs);
+ cancelStream();
+ break;
+ case EARLY_TERMINATED:
+ LOGGER.debug("Mailbox: {} has been early terminated", mailboxId);
+ onCompleted();
+ break;
+ default:
+ throw new IllegalStateException("Unsupported mailbox status: " +
status);
}
} catch (Exception e) {
String errorMessage = "Caught exception while processing blocks for
mailbox: " + mailboxId;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 64e4dc31ae..3465f94602 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.query.runtime.blocks;
-import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,12 +46,6 @@ public class TransferableBlock implements Block {
private List<Object[]> _container;
public TransferableBlock(List<Object[]> container, DataSchema dataSchema,
DataBlock.Type containerType) {
- this(container, dataSchema, containerType, false);
- }
-
- @VisibleForTesting
- TransferableBlock(List<Object[]> container, DataSchema dataSchema,
DataBlock.Type containerType,
- boolean isErrorBlock) {
_container = container;
_dataSchema = dataSchema;
_type = containerType;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index c5e1c71f5f..78bc02351f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -40,6 +40,7 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,37 +119,40 @@ public class MailboxSendOperator extends
MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
- boolean canContinue = true;
- TransferableBlock transferableBlock;
try {
- transferableBlock = _sourceOperator.nextBlock();
- if (transferableBlock.isNoOpBlock()) {
- return transferableBlock;
- } else if (transferableBlock.isEndOfStreamBlock()) {
- if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
- // Stats need to be populated here because the block is being sent
to the mailbox
- // and the receiving opChain will not be able to access the stats
from the previous opChain
- TransferableBlock eosBlockWithStats =
TransferableBlockUtils.getEndOfStreamTransferableBlock(
-
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
- sendTransferableBlock(eosBlockWithStats);
- } else {
- sendTransferableBlock(transferableBlock);
- }
- } else { // normal blocks
- // check whether we should continue depending on exchange queue
condition.
- canContinue = sendTransferableBlock(transferableBlock);
+ TransferableBlock block = _sourceOperator.nextBlock();
+ if (block.isNoOpBlock()) {
+ return block;
+ } else if (block.isErrorBlock()) {
+ sendTransferableBlock(block);
+ return block;
+ } else if (block.isSuccessfulEndOfStreamBlock()) {
+ // Stats need to be populated here because the block is being sent to
the mailbox
+ // and the receiving opChain will not be able to access the stats from
the previous opChain
+ TransferableBlock eosBlockWithStats =
TransferableBlockUtils.getEndOfStreamTransferableBlock(
+
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
+ sendTransferableBlock(eosBlockWithStats);
+ return block;
+ } else {
+ // Data block
+ boolean canContinue = sendTransferableBlock(block);
+ // Yield if we cannot continue to put transferable block into the
sending queue
+ return canContinue ? block :
TransferableBlockUtils.getNoOpTransferableBlock();
}
+ } catch (EarlyTerminationException e) {
+ // TODO: Query stats are not sent when opChain is early terminated
+ LOGGER.debug("Early terminating opChain: " + _context.getId());
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
} catch (Exception e) {
- transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+ TransferableBlock errorBlock =
TransferableBlockUtils.getErrorTransferableBlock(e);
try {
LOGGER.error("Exception while transferring data on opChain: " +
_context.getId(), e);
- sendTransferableBlock(transferableBlock);
+ sendTransferableBlock(errorBlock);
} catch (Exception e2) {
LOGGER.error("Exception while sending error block.", e2);
}
+ return errorBlock;
}
- // yield if we cannot continue to put transferable block into the sending
queue
- return canContinue ? transferableBlock :
TransferableBlockUtils.getNoOpTransferableBlock();
}
private boolean sendTransferableBlock(TransferableBlock block)
@@ -157,7 +161,8 @@ public class MailboxSendOperator extends MultiStageOperator
{
if (_exchange.offerBlock(block, timeoutMs)) {
return _exchange.getRemainingCapacity() > 0;
} else {
- throw new TimeoutException("Timeout while offering data block into the
sending queue.");
+ throw new TimeoutException(
+ String.format("Timed out while offering block into the sending queue
after %dms", timeoutMs));
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 0f3fef5208..7e0b6f8e4c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -84,7 +84,7 @@ public class SortOperator extends MultiStageOperator {
// - 'isInputSorted' is set to true indicating that the data was already
sorted
if (collationKeys.isEmpty() || isInputSorted) {
_priorityQueue = null;
- _rows = new ArrayList<>();
+ _rows = new ArrayList<>(Math.min(defaultHolderCapacity, _numRowsToKeep));
} else {
// Use the opposite direction as specified by the collation directions
since we need the PriorityQueue to decide
// which elements to keep and which to remove based on the limits.
@@ -160,7 +160,7 @@ public class SortOperator extends MultiStageOperator {
if (block.isErrorBlock()) {
_upstreamErrorBlock = block;
return;
- } else if (TransferableBlockUtils.isEndOfStream(block)) {
+ } else if (block.isSuccessfulEndOfStreamBlock()) {
_readyToConstruct = true;
return;
}
@@ -168,11 +168,16 @@ public class SortOperator extends MultiStageOperator {
List<Object[]> container = block.getContainer();
if (_priorityQueue == null) {
// TODO: when push-down properly, we shouldn't get more than
_numRowsToKeep
- if (_rows.size() <= _numRowsToKeep) {
- if (_rows.size() + container.size() <= _numRowsToKeep) {
+ int numRows = _rows.size();
+ if (numRows < _numRowsToKeep) {
+ if (numRows + container.size() < _numRowsToKeep) {
_rows.addAll(container);
} else {
- _rows.addAll(container.subList(0, _numRowsToKeep -
_rows.size()));
+ _rows.addAll(container.subList(0, _numRowsToKeep - numRows));
+ LOGGER.debug("Early terminate at SortOperator - operatorId={},
opChainId={}", _operatorId,
+ _context.getId());
+ _readyToConstruct = true;
+ return;
}
}
} else {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index b7a152df02..e47d9835fa 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -35,6 +35,7 @@ import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.OpChainId;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,6 +95,16 @@ public abstract class BlockExchange {
public boolean offerBlock(TransferableBlock block, long timeoutMs)
throws Exception {
+ boolean isEarlyTerminated = true;
+ for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+ if (!sendingMailbox.isTerminated()) {
+ isEarlyTerminated = false;
+ break;
+ }
+ }
+ if (isEarlyTerminated) {
+ throw new EarlyTerminationException();
+ }
return _queue.offer(block, timeoutMs, TimeUnit.MILLISECONDS);
}
@@ -111,8 +122,8 @@ public abstract class BlockExchange {
}
block = _queue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (block == null) {
- block = TransferableBlockUtils.getErrorTransferableBlock(
- new TimeoutException("Timed out on exchange for opChain: " +
_opChainId));
+ block = TransferableBlockUtils.getErrorTransferableBlock(new
TimeoutException(
+ String.format("Timed out polling block for opChain: %s after
%dms", _opChainId, timeoutMs)));
} else {
// Notify that the block exchange can now accept more blocks.
_callback.accept(_opChainId);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index 0b77996c22..9e372b91b9 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -506,13 +506,15 @@ public class MailboxServiceTest {
Thread.sleep(deadlineMs - System.currentTimeMillis() + 10);
receiveMailLatch.await();
assertEquals(numCallbacks.get(), 2);
- try {
- sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{1}));
- fail("Expect exception when sending data after timing out");
- } catch (Exception e) {
- // Expected
- }
- assertEquals(numCallbacks.get(), 2);
+ // TODO: Currently we cannot differentiate early termination vs stream
error
+ assertTrue(sendingMailbox.isTerminated());
+// try {
+// sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{1}));
+// fail("Expect exception when sending data after timing out");
+// } catch (Exception e) {
+// // Expected
+// }
+// assertEquals(numCallbacks.get(), 2);
// Data blocks will be cleaned up
ReceivingMailbox receivingMailbox =
_mailboxService1.getReceivingMailbox(mailboxId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]