This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a5ae4d738a [Multi-stage] Fix SortedMailboxReceiveOperator to not pull
2 EOS blocks (#12406)
a5ae4d738a is described below
commit a5ae4d738a0e646d500b6bb3e2031f7227f035e2
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Feb 14 00:03:02 2024 -0800
[Multi-stage] Fix SortedMailboxReceiveOperator to not pull 2 EOS blocks
(#12406)
---
.../runtime/operator/BaseMailboxReceiveOperator.java | 10 +---------
.../query/runtime/operator/MailboxReceiveOperator.java | 4 ++--
.../runtime/operator/SortedMailboxReceiveOperator.java | 17 +++++++++++------
3 files changed, 14 insertions(+), 17 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 808caba04e..a88e122739 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -47,7 +47,7 @@ public abstract class BaseMailboxReceiveOperator extends
MultiStageOperator {
protected final MailboxService _mailboxService;
protected final RelDistribution.Type _exchangeType;
protected final List<String> _mailboxIds;
- private final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer;
+ protected final BlockingMultiStreamConsumer.OfTransferableBlock
_multiConsumer;
public BaseMailboxReceiveOperator(OpChainExecutionContext context,
RelDistribution.Type exchangeType,
int senderStageId) {
@@ -73,14 +73,6 @@ public abstract class BaseMailboxReceiveOperator extends
MultiStageOperator {
new BlockingMultiStreamConsumer.OfTransferableBlock(context.getId(),
context.getDeadlineMs(), asyncStreams);
}
- protected BlockingMultiStreamConsumer.OfTransferableBlock getMultiConsumer()
{
- return _multiConsumer;
- }
-
- public List<String> getMailboxIds() {
- return _mailboxIds;
- }
-
@Override
protected void earlyTerminate() {
_isEarlyTerminated = true;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index ad7913cdc1..584b49640f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -42,13 +42,13 @@ public class MailboxReceiveOperator extends
BaseMailboxReceiveOperator {
@Override
protected TransferableBlock getNextBlock() {
- TransferableBlock block = getMultiConsumer().readBlockBlocking();
+ TransferableBlock block = _multiConsumer.readBlockBlocking();
// When early termination flag is set, caller is expecting an EOS block to
be returned, however since the 2 stages
// between sending/receiving mailbox are setting early termination flag
asynchronously, there's chances that the
// next block pulled out of the ReceivingMailbox to be an already buffered
normal data block. This requires the
// MailboxReceiveOperator to continue pulling and dropping data block
until an EOS block is observed.
while (_isEarlyTerminated && !block.isEndOfStreamBlock()) {
- block = getMultiConsumer().readBlockBlocking();
+ block = _multiConsumer.readBlockBlocking();
}
return block;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
index 05804c12d3..8949ad569a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
@@ -52,7 +52,7 @@ public class SortedMailboxReceiveOperator extends
BaseMailboxReceiveOperator {
private final boolean _isSortOnSender;
private final List<Object[]> _rows = new ArrayList<>();
- private boolean _isSortedBlockConstructed;
+ private TransferableBlock _eosBlock;
public SortedMailboxReceiveOperator(OpChainExecutionContext context,
RelDistribution.Type exchangeType,
DataSchema dataSchema, List<RexExpression> collationKeys,
List<Direction> collationDirections,
@@ -74,20 +74,25 @@ public class SortedMailboxReceiveOperator extends
BaseMailboxReceiveOperator {
@Override
protected TransferableBlock getNextBlock() {
- while (true) { // loop in order to keep asking if we receive data blocks
- TransferableBlock block = getMultiConsumer().readBlockBlocking();
+ if (_eosBlock != null) {
+ return _eosBlock;
+ }
+ // Collect all the rows from the mailbox and sort them
+ while (true) {
+ TransferableBlock block = _multiConsumer.readBlockBlocking();
if (block.isDataBlock()) {
_rows.addAll(block.getContainer());
} else if (block.isErrorBlock()) {
return block;
} else {
assert block.isSuccessfulEndOfStreamBlock();
-
- if (!_isSortedBlockConstructed && !_rows.isEmpty()) {
+ if (!_rows.isEmpty()) {
+ _eosBlock = block;
+ // TODO: This might not be efficient because we are sorting all the
received rows. We should use a k-way merge
+ // when sender side is sorted.
_rows.sort(
new SortUtils.SortComparator(_collationKeys,
_collationDirections, _collationNullDirections, _dataSchema,
false));
- _isSortedBlockConstructed = true;
return new TransferableBlock(_rows, _dataSchema, DataBlock.Type.ROW);
} else {
return block;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]