This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c62784c53 [multistage][refactor] Refactor RelNode to StageNode 
conversion (#10730)
6c62784c53 is described below

commit 6c62784c5382d689099617d4020832d603761c46
Author: Xiang Fu <[email protected]>
AuthorDate: Sat May 6 16:58:04 2023 -0700

    [multistage][refactor] Refactor RelNode to StageNode conversion (#10730)
    
    * Convert RelNode to StageNode in multiple steps
    
    * Merge exchange spliter and stageId assigner into a StageFragmenter
---
 .../query/planner/ExplainPlanStageVisitor.java     |   6 +
 .../query/planner/logical/RelToStageConverter.java |  25 ++++
 .../planner/logical/ShuffleRewriteVisitor.java     |   6 +
 .../query/planner/logical/StageFragmenter.java     | 141 +++++++++++++++++++++
 .../pinot/query/planner/logical/StagePlanner.java  |  80 ++----------
 .../planner/physical/DispatchablePlanVisitor.java  |   6 +
 .../colocated/GreedyShuffleRewriteVisitor.java     |   6 +
 .../query/planner/stage/AbstractStageNode.java     |   7 +-
 .../stage/DefaultPostOrderTraversalVisitor.java    |   6 +
 .../pinot/query/planner/stage/ExchangeNode.java    |  93 ++++++++++++++
 .../query/planner/stage/MailboxReceiveNode.java    |   4 +
 .../pinot/query/planner/stage/MailboxSendNode.java |   4 +
 .../pinot/query/planner/stage/StageNode.java       |   2 +
 .../query/planner/stage/StageNodeSerDeUtils.java   |   3 +
 .../query/planner/stage/StageNodeVisitor.java      |   2 +
 .../query/runtime/plan/PhysicalPlanVisitor.java    |   6 +
 .../runtime/plan/ServerRequestPlanVisitor.java     |   6 +
 17 files changed, 333 insertions(+), 70 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
index 325c1cd7ad..e639701256 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
 import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -129,6 +130,11 @@ public class ExplainPlanStageVisitor implements 
StageNodeVisitor<StringBuilder,
     return context._builder;
   }
 
+  @Override
+  public StringBuilder visitExchange(ExchangeNode exchangeNode, Context 
context) {
+    throw new UnsupportedOperationException("ExchangeNode should not be 
visited");
+  }
+
   @Override
   public StringBuilder visitFilter(FilterNode node, Context context) {
     return visitSimpleNode(node, context);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index 576f209e8c..ea0c46b390 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -20,10 +20,14 @@ package org.apache.pinot.query.planner.logical;
 
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.SortExchange;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -32,6 +36,7 @@ import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelRecordType;
@@ -40,6 +45,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.ProjectNode;
@@ -88,11 +94,30 @@ public final class RelToStageConverter {
       return convertLogicalWindow((LogicalWindow) node, currentStageId);
     } else if (node instanceof SetOp) {
       return convertLogicalSetOp((SetOp) node, currentStageId);
+    } else if (node instanceof Exchange) {
+      return convertLogicalExchange((Exchange) node, currentStageId);
     } else {
       throw new UnsupportedOperationException("Unsupported logical plan node: 
" + node);
     }
   }
 
+  private static StageNode convertLogicalExchange(Exchange node, int 
currentStageId) {
+    RelCollation collation = null;
+    boolean isSortOnSender = false;
+    boolean isSortOnReceiver = false;
+    if (node instanceof SortExchange) {
+      collation = ((SortExchange) node).getCollation();
+      if (node instanceof PinotLogicalSortExchange) {
+        // These flags only take meaning if the collation is not null or empty
+        isSortOnSender = ((PinotLogicalSortExchange) node).isSortOnSender();
+        isSortOnReceiver = ((PinotLogicalSortExchange) 
node).isSortOnReceiver();
+      }
+    }
+    List<RelFieldCollation> fieldCollations = (collation == null) ? null : 
collation.getFieldCollations();
+    return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()), 
node.getDistribution(), fieldCollations,
+        isSortOnSender, isSortOnReceiver);
+  }
+
   private static StageNode convertLogicalSetOp(SetOp node, int currentStageId) 
{
     return new SetOpNode(SetOpNode.SetOpType.fromObject(node), currentStageId, 
toDataSchema(node.getRowType()),
         node.all);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
index 6f5376c1f5..664ddd44ff 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -87,6 +88,11 @@ public class ShuffleRewriteVisitor implements 
StageNodeVisitor<Set<Integer>, Voi
     return newPartitionKeys;
   }
 
+  @Override
+  public Set<Integer> visitExchange(ExchangeNode exchangeNode, Void context) {
+    throw new UnsupportedOperationException("Exchange not yet supported!");
+  }
+
   @Override
   public Set<Integer> visitFilter(FilterNode node, Void context) {
     // filters don't change partition keys
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java
new file mode 100644
index 0000000000..e33638d824
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SetOpNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.planner.stage.WindowNode;
+
+
+public class StageFragmenter implements StageNodeVisitor<StageNode, 
StageFragmenter.Context> {
+  public static final StageFragmenter INSTANCE = new StageFragmenter();
+
+  private StageNode process(StageNode node, Context context) {
+    node.setStageId(context._currentStageId);
+    List<StageNode> inputs = node.getInputs();
+    for (int i = 0; i < inputs.size(); i++) {
+      context._previousStageId = node.getStageId();
+      inputs.set(i, inputs.get(i).visit(this, context));
+    }
+    return node;
+  }
+
+  @Override
+  public StageNode visitAggregate(AggregateNode node, Context context) {
+    return process(node, context);
+  }
+
+  @Override
+  public StageNode visitFilter(FilterNode node, Context context) {
+    return process(node, context);
+  }
+
+  @Override
+  public StageNode visitJoin(JoinNode node, Context context) {
+    return process(node, context);
+  }
+
+  @Override
+  public StageNode visitMailboxReceive(MailboxReceiveNode node, Context 
context) {
+    throw new UnsupportedOperationException("MailboxReceiveNode should not be 
visited by StageFragmenter");
+  }
+
+  @Override
+  public StageNode visitMailboxSend(MailboxSendNode node, Context context) {
+    throw new UnsupportedOperationException("MailboxSendNode should not be 
visited by StageFragmenter");
+  }
+
+  @Override
+  public StageNode visitProject(ProjectNode node, Context context) {
+    return process(node, context);
+  }
+
+  @Override
+  public StageNode visitSort(SortNode node, Context context) {
+    return process(node, context);
+  }
+
+  @Override
+  public StageNode visitTableScan(TableScanNode node, Context context) {
+    return process(node, context);
+  }
+
+  @Override
+  public StageNode visitValue(ValueNode node, Context context) {
+    return process(node, context);
+  }
+
+  @Override
+  public StageNode visitWindow(WindowNode node, Context context) {
+    return process(node, context);
+  }
+
+  @Override
+  public StageNode visitSetOp(SetOpNode node, Context context) {
+    return process(node, context);
+  }
+
+  @Override
+  public StageNode visitExchange(ExchangeNode node, Context context) {
+    int nodeStageId = context._previousStageId;
+
+    context._currentStageId++;
+    StageNode nextStageRoot = node.getInputs().get(0).visit(this, context);
+
+    List<Integer> distributionKeys = node.getDistributionKeys();
+    RelDistribution.Type exchangeType = node.getDistributionType();
+
+    // make an exchange sender and receiver node pair
+    // only HASH_DISTRIBUTED requires a partition key selector; so all other 
types (SINGLETON and BROADCAST)
+    // of exchange will not carry a partition key selector.
+    KeySelector<Object[], Object[]> keySelector = exchangeType == 
RelDistribution.Type.HASH_DISTRIBUTED
+        ? new FieldSelectionKeySelector(distributionKeys) : null;
+
+    StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), 
nextStageRoot.getDataSchema(),
+        nodeStageId, exchangeType, keySelector, node.getCollations(), 
node.isSortOnSender());
+    StageNode mailboxReceiver = new MailboxReceiveNode(nodeStageId, 
nextStageRoot.getDataSchema(),
+        nextStageRoot.getStageId(), exchangeType, keySelector,
+        node.getCollations(), node.isSortOnSender(), node.isSortOnReceiver(), 
mailboxSender);
+    mailboxSender.addInput(nextStageRoot);
+
+    return mailboxReceiver;
+  }
+
+  public static class Context {
+
+    // Stage ID starts with 1, 0 will be reserved for ROOT stage.
+    Integer _currentStageId = 1;
+    Integer _previousStageId = 1;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index ce0417ea0f..91e1ba7a89 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -20,18 +20,12 @@ package org.apache.pinot.query.planner.logical;
 
 import java.util.List;
 import java.util.Set;
-import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.core.Exchange;
-import org.apache.calcite.rel.core.SortExchange;
-import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
-import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.physical.DispatchablePlanContext;
 import org.apache.pinot.query.planner.physical.DispatchablePlanVisitor;
 import 
org.apache.pinot.query.planner.physical.colocated.GreedyShuffleRewriteVisitor;
@@ -50,7 +44,6 @@ public class StagePlanner {
   private final PlannerContext _plannerContext;   // DO NOT REMOVE.
   private final WorkerManager _workerManager;
   private final TableCache _tableCache;
-  private int _stageIdCounter;
   private long _requestId;
 
   public StagePlanner(PlannerContext plannerContext, WorkerManager 
workerManager, long requestId,
@@ -69,11 +62,12 @@ public class StagePlanner {
    */
   public QueryPlan makePlan(RelRoot relRoot, Set<String> tableNames) {
     RelNode relRootNode = relRoot.rel;
-    // Stage ID starts with 1, 0 will be reserved for ROOT stage.
-    _stageIdCounter = 1;
 
-    // walk the plan and create stages.
-    StageNode globalStageRoot = walkRelPlan(relRootNode, getNewStageId());
+    // Walk through RelNode tree and construct a StageNode tree.
+    StageNode globalStageRoot = relNodeToStageNode(relRootNode);
+
+    // Fragment the stage tree into multiple stages.
+    globalStageRoot = globalStageRoot.visit(StageFragmenter.INSTANCE, new 
StageFragmenter.Context());
 
     // global root needs to send results back to the ROOT, a.k.a. the client 
response node. the last stage only has one
     // receiver so doesn't matter what the exchange type is. setting it to 
SINGLETON by default.
@@ -99,31 +93,13 @@ public class StagePlanner {
 
   // non-threadsafe
   // TODO: add dataSchema (extracted from RelNode schema) to the StageNode.
-  private StageNode walkRelPlan(RelNode node, int currentStageId) {
-    if (isExchangeNode(node)) {
-      StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
-      RelDistribution distribution = ((Exchange) node).getDistribution();
-      RelCollation collation = null;
-      boolean isSortOnSender = false;
-      boolean isSortOnReceiver = false;
-      if (isSortExchangeNode(node)) {
-        collation = ((SortExchange) node).getCollation();
-        if (node instanceof PinotLogicalSortExchange) {
-          // These flags only take meaning if the collation is not null or 
empty
-          isSortOnSender = ((PinotLogicalSortExchange) node).isSortOnSender();
-          isSortOnReceiver = ((PinotLogicalSortExchange) 
node).isSortOnReceiver();
-        }
-      }
-      return createSendReceivePair(nextStageRoot, distribution, collation, 
isSortOnSender, isSortOnReceiver,
-          currentStageId);
-    } else {
-      StageNode stageNode = RelToStageConverter.toStageNode(node, 
currentStageId);
-      List<RelNode> inputs = node.getInputs();
-      for (RelNode input : inputs) {
-        stageNode.addInput(walkRelPlan(input, currentStageId));
-      }
-      return stageNode;
+  private StageNode relNodeToStageNode(RelNode node) {
+    StageNode stageNode = RelToStageConverter.toStageNode(node, -1);
+    List<RelNode> inputs = node.getInputs();
+    for (RelNode input : inputs) {
+      stageNode.addInput(relNodeToStageNode(input));
     }
+    return stageNode;
   }
 
   // TODO: Switch to Worker SPI to avoid multiple-places where workers are 
assigned.
@@ -132,38 +108,4 @@ public class StagePlanner {
       GreedyShuffleRewriteVisitor.optimizeShuffles(queryPlan, _tableCache);
     }
   }
-
-  private StageNode createSendReceivePair(StageNode nextStageRoot, 
RelDistribution distribution, RelCollation collation,
-      boolean isSortOnSender, boolean isSortOnReceiver, int currentStageId) {
-    List<Integer> distributionKeys = distribution.getKeys();
-    RelDistribution.Type exchangeType = distribution.getType();
-
-    // make an exchange sender and receiver node pair
-    // only HASH_DISTRIBUTED requires a partition key selector; so all other 
types (SINGLETON and BROADCAST)
-    // of exchange will not carry a partition key selector.
-    KeySelector<Object[], Object[]> keySelector = exchangeType == 
RelDistribution.Type.HASH_DISTRIBUTED
-        ? new FieldSelectionKeySelector(distributionKeys) : null;
-
-    StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), 
nextStageRoot.getDataSchema(),
-        currentStageId, exchangeType, keySelector, collation == null ? null : 
collation.getFieldCollations(),
-        isSortOnSender);
-    StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, 
nextStageRoot.getDataSchema(),
-        nextStageRoot.getStageId(), exchangeType, keySelector,
-        collation == null ? null : collation.getFieldCollations(), 
isSortOnSender, isSortOnReceiver, mailboxSender);
-    mailboxSender.addInput(nextStageRoot);
-
-    return mailboxReceiver;
-  }
-
-  private boolean isExchangeNode(RelNode node) {
-    return (node instanceof Exchange);
-  }
-
-  private boolean isSortExchangeNode(RelNode node) {
-    return (node instanceof SortExchange);
-  }
-
-  private int getNewStageId() {
-    return _stageIdCounter++;
-  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index 82a1afe936..5258fc5637 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.planner.physical;
 
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -118,6 +119,11 @@ public class DispatchablePlanVisitor implements 
StageNodeVisitor<Void, Dispatcha
     return null;
   }
 
+  @Override
+  public Void visitExchange(ExchangeNode exchangeNode, DispatchablePlanContext 
context) {
+    throw new UnsupportedOperationException("ExchangeNode should not be 
visited by DispatchablePlanVisitor");
+  }
+
   @Override
   public Void visitFilter(FilterNode node, DispatchablePlanContext context) {
     node.getInputs().get(0).visit(this, context);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
index 4f89435a6e..6d58bde3af 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
@@ -35,6 +35,7 @@ import 
org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
 import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -257,6 +258,11 @@ public class GreedyShuffleRewriteVisitor implements 
StageNodeVisitor<Set<Colocat
     return ImmutableSet.of();
   }
 
+  @Override
+  public Set<ColocationKey> visitExchange(ExchangeNode exchangeNode, 
GreedyShuffleRewriteContext context) {
+    throw new UnsupportedOperationException("ExchangeNode should not be 
visited by this visitor");
+  }
+
   @Override
   public Set<ColocationKey> visitTableScan(TableScanNode node, 
GreedyShuffleRewriteContext context) {
     TableConfig tableConfig = _tableCache.getTableConfig(node.getTableName());
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
index 46de8731b7..c2f0e4b1be 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
@@ -28,7 +28,7 @@ import 
org.apache.pinot.query.planner.serde.ProtoSerializationUtils;
 
 public abstract class AbstractStageNode implements StageNode, 
ProtoSerializable {
 
-  protected final int _stageId;
+  protected int _stageId;
   protected final List<StageNode> _inputs;
   protected DataSchema _dataSchema;
 
@@ -47,6 +47,11 @@ public abstract class AbstractStageNode implements 
StageNode, ProtoSerializable
     return _stageId;
   }
 
+  @Override
+  public void setStageId(int stageId) {
+    _stageId = stageId;
+  }
+
   @Override
   public List<StageNode> getInputs() {
     return _inputs;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
index 5e8f604bf0..4c93f9d289 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
@@ -90,4 +90,10 @@ public abstract class DefaultPostOrderTraversalVisitor<T, C> 
implements StageNod
     node.getInputs().forEach(input -> input.visit(this, context));
     return process(node, context);
   }
+
+  @Override
+  public T visitExchange(ExchangeNode node, C context) {
+    node.getInputs().forEach(input -> input.visit(this, context));
+    return process(node, context);
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ExchangeNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ExchangeNode.java
new file mode 100644
index 0000000000..328dd8568c
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ExchangeNode.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.serde.ProtoProperties;
+
+
+/**
+ * ExchangeNode represents the exchange stage in the query plan.
+ * It is used to exchange the data between the instances.
+ */
+public class ExchangeNode extends AbstractStageNode {
+
+  @ProtoProperties
+  private RelDistribution.Type _exchangeType;
+
+  @ProtoProperties
+  private List<Integer> _keys;
+
+  @ProtoProperties
+  private boolean _isSortOnSender = false;
+
+  @ProtoProperties
+  private boolean _isSortOnReceiver = false;
+
+  @ProtoProperties
+  private List<RelFieldCollation> _collations;
+
+  public ExchangeNode(int stageId) {
+    super(stageId);
+  }
+
+  public ExchangeNode(int currentStageId, DataSchema dataSchema, 
RelDistribution distribution,
+      List<RelFieldCollation> collations, boolean isSortOnSender,
+      boolean isSortOnReceiver) {
+    super(currentStageId, dataSchema);
+    _keys = distribution.getKeys();
+    _exchangeType = distribution.getType();
+    _isSortOnSender = isSortOnSender;
+    _isSortOnReceiver = isSortOnReceiver;
+    _collations = collations;
+  }
+
+  @Override
+  public String explain() {
+    return "EXCHANGE";
+  }
+
+  @Override
+  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+    return visitor.visitExchange(this, context);
+  }
+
+  public RelDistribution.Type getDistributionType() {
+    return _exchangeType;
+  }
+
+  public List<Integer> getDistributionKeys() {
+    return _keys;
+  }
+
+  public boolean isSortOnSender() {
+    return _isSortOnSender;
+  }
+
+  public boolean isSortOnReceiver() {
+    return _isSortOnReceiver;
+  }
+
+  public List<RelFieldCollation> getCollations() {
+    return _collations;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
index 97a019741f..13eb8b5296 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
@@ -81,6 +81,10 @@ public class MailboxReceiveNode extends AbstractStageNode {
     _sender = sender;
   }
 
+  public void setSenderStageId(Integer senderStageId) {
+    _senderStageId = senderStageId;
+  }
+
   public int getSenderStageId() {
     return _senderStageId;
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
index c98b82907a..54f80c6794 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
@@ -77,6 +77,10 @@ public class MailboxSendNode extends AbstractStageNode {
     return _receiverStageId;
   }
 
+  public void setReceiverStageId(int receiverStageId) {
+    _receiverStageId = receiverStageId;
+  }
+
   public void setExchangeType(RelDistribution.Type exchangeType) {
     _exchangeType = exchangeType;
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
index 7e3278cfe8..ae851f449b 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
@@ -35,6 +35,8 @@ public interface StageNode extends Serializable {
 
   int getStageId();
 
+  void setStageId(int stageId);
+
   List<StageNode> getInputs();
 
   void addInput(StageNode stageNode);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
index 45a9b8c1df..f96eadfc06 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
@@ -87,6 +87,9 @@ public final class StageNodeSerDeUtils {
         return new WindowNode(stageId);
       case "SetOpNode":
         return new SetOpNode(stageId);
+      case "ExchangeNode":
+        throw new IllegalArgumentException(
+            "ExchangeNode should be already split into MailboxSendNode and 
MailboxReceiveNode");
       default:
         throw new IllegalArgumentException("Unknown node name: " + nodeName);
     }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
index 78acf94c76..f72e9540ec 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
@@ -58,4 +58,6 @@ public interface StageNodeVisitor<T, C> {
   T visitWindow(WindowNode node, C context);
 
   T visitSetOp(SetOpNode setOpNode, C context);
+
+  T visitExchange(ExchangeNode exchangeNode, C context);
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 4ff613e3ec..27c4302197 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.runtime.plan;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -130,6 +131,11 @@ public class PhysicalPlanVisitor implements 
StageNodeVisitor<MultiStageOperator,
     }
   }
 
+  @Override
+  public MultiStageOperator visitExchange(ExchangeNode exchangeNode, 
PlanRequestContext context) {
+    throw new UnsupportedOperationException("ExchangeNode should not be 
visited");
+  }
+
   @Override
   public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext 
context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index db6053a3c9..512e8717e5 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -38,6 +38,7 @@ import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.parser.CalciteRexExpressionParser;
 import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.ExchangeNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -189,6 +190,11 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
     return null;
   }
 
+  @Override
+  public Void visitExchange(ExchangeNode exchangeNode, 
ServerPlanRequestContext context) {
+    throw new UnsupportedOperationException("Exchange not yet supported!");
+  }
+
   @Override
   public Void visitFilter(FilterNode node, ServerPlanRequestContext context) {
     visitChildren(node, context);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to