This is an automated email from the ASF dual-hosted git repository.

chrispeck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f444fc2908 [multistage] Fix Bugs in SetOp Handling and Multi-Column 
Join (#16330)
f444fc2908 is described below

commit f444fc2908c8879da1894515aec4315ff55d12cd
Author: Ankit Sultana <[email protected]>
AuthorDate: Fri Jul 11 18:02:52 2025 -0500

    [multistage] Fix Bugs in SetOp Handling and Multi-Column Join (#16330)
    
    * [multistage] Fix Bugs in SetOp Handling and Multi-Column Join
    
    * fix union-all handling
---
 .../pinot/calcite/rel/traits/TraitAssignment.java  | 24 ++++++
 .../planner/physical/v2/RelToPRelConverter.java    |  9 ++-
 .../physical/v2/nodes/PhysicalIntersect.java       | 86 ++++++++++++++++++++++
 .../v2/opt/rules/WorkerExchangeAssignmentRule.java | 26 +++++--
 .../resources/queries/PhysicalOptimizerPlans.json  | 25 +++++++
 .../query/runtime/queries/QueryRunnerTestBase.java |  2 +
 .../runtime/queries/ResourceBasedQueriesTest.java  | 28 ++++++-
 .../src/test/resources/queries/QueryHints.json     |  6 +-
 8 files changed, 196 insertions(+), 10 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java
index a7ffe7e482..4ce4a1cf53 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java
@@ -33,7 +33,10 @@ import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.query.context.PhysicalPlannerContext;
 import org.apache.pinot.query.planner.physical.v2.PRelNode;
@@ -83,10 +86,31 @@ public class TraitAssignment {
       return (PRelNode) assignAggregate((PhysicalAggregate) relNode);
     } else if (relNode instanceof PhysicalWindow) {
       return (PRelNode) assignWindow((PhysicalWindow) relNode);
+    } else if (relNode instanceof SetOp) {
+      return (PRelNode) assignSetOp((SetOp) relNode);
     }
     return (PRelNode) relNode;
   }
 
+  RelNode assignSetOp(SetOp setOp) {
+    if (setOp instanceof Union) {
+      Union union = (Union) setOp;
+      if (union.all) {
+        // UNION ALL means we can return duplicates, so no trait required.
+        return setOp;
+      }
+    }
+    RelDistribution pushedDownDistTrait = 
RelDistributions.hash(ImmutableIntList.range(0,
+        setOp.getRowType().getFieldCount()));
+    List<RelNode> newInputs = new ArrayList<>();
+    for (RelNode input : setOp.getInputs()) {
+      RelTraitSet newTraitSet = input.getTraitSet().plus(pushedDownDistTrait);
+      RelNode newInput = input.copy(newTraitSet, input.getInputs());
+      newInputs.add(newInput);
+    }
+    return setOp.copy(setOp.getTraitSet(), newInputs);
+  }
+
   /**
    * Sort is always computed by coalescing to a single stream. Hence, we add a 
SINGLETON trait to the sort input.
    */
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
index 5e142c2620..09323341b1 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AsofJoin;
 import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Intersect;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Minus;
 import org.apache.calcite.rel.core.Project;
@@ -41,7 +42,9 @@ import org.apache.pinot.query.context.PhysicalPlannerContext;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAsOfJoin;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalFilter;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalIntersect;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalJoin;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalMinus;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalProject;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalTableScan;
@@ -119,8 +122,12 @@ public class RelToPRelConverter {
           nodeIdGenerator.get(), null);
     } else if (relNode instanceof Minus) {
       Minus minus = (Minus) relNode;
-      return new PhysicalUnion(minus.getCluster(), minus.getTraitSet(), 
minus.getHints(), minus.all, inputs,
+      return new PhysicalMinus(minus.getCluster(), minus.getTraitSet(), 
minus.getHints(), inputs, minus.all,
           nodeIdGenerator.get(), null);
+    } else if (relNode instanceof Intersect) {
+      Intersect intersect = (Intersect) relNode;
+      return new PhysicalIntersect(intersect.getCluster(), 
intersect.getTraitSet(), intersect.getHints(), inputs,
+          intersect.all, nodeIdGenerator.get(), null);
     } else if (relNode instanceof Sort) {
       Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input 
of sort. Found: %s", inputs);
       Sort sort = (Sort) relNode;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalIntersect.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalIntersect.java
new file mode 100644
index 0000000000..f243e5453e
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalIntersect.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.nodes;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.pinot.query.planner.physical.v2.PRelNode;
+import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
+
+
+public class PhysicalIntersect extends Intersect implements PRelNode {
+  private final int _nodeId;
+  private final List<PRelNode> _pRelInputs;
+  @Nullable
+  private final PinotDataDistribution _pinotDataDistribution;
+
+  public PhysicalIntersect(RelOptCluster cluster, RelTraitSet traits, 
List<RelHint> hints, List<PRelNode> inputs,
+      boolean all, int nodeId, @Nullable PinotDataDistribution 
pinotDataDistribution) {
+    super(cluster, traits, hints, 
inputs.stream().map(PRelNode::unwrap).collect(Collectors.toList()), all);
+    _nodeId = nodeId;
+    _pRelInputs = inputs;
+    _pinotDataDistribution = pinotDataDistribution;
+  }
+
+  @Override
+  public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    return new PhysicalIntersect(
+        getCluster(), traitSet, getHints(), 
inputs.stream().map(PRelNode.class::cast).collect(Collectors.toList()),
+        all, _nodeId, _pinotDataDistribution);
+  }
+
+  @Override
+  public int getNodeId() {
+    return _nodeId;
+  }
+
+  @Override
+  public List<PRelNode> getPRelInputs() {
+    return _pRelInputs;
+  }
+
+  @Override
+  public RelNode unwrap() {
+    return this;
+  }
+
+  @Nullable
+  @Override
+  public PinotDataDistribution getPinotDataDistribution() {
+    return _pinotDataDistribution;
+  }
+
+  @Override
+  public boolean isLeafStage() {
+    return false;
+  }
+
+  @Override
+  public PRelNode with(int newNodeId, List<PRelNode> newInputs, 
PinotDataDistribution newDistribution) {
+    return new PhysicalIntersect(
+        getCluster(), getTraitSet(), getHints(), newInputs, all, newNodeId, 
newDistribution);
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
index 2f338083bd..15cd43c028 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
@@ -37,6 +37,7 @@ import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.Values;
 import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
@@ -334,10 +335,18 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
     }
     Preconditions.checkState(relDistribution.getType() == 
RelDistribution.Type.HASH_DISTRIBUTED,
         "Unexpected distribution constraint: %s", relDistribution.getType());
-    Preconditions.checkState(parent instanceof Join, "Expected parent to be 
join. Found: %s", parent);
-    Join parentJoin = (Join) parent;
-    // TODO(mse-physical): add support for sub-partitioning and coalescing 
exchange.
-    HashDistributionDesc hashDistToMatch = 
getLeftInputHashDistributionDesc(parentJoin).orElseThrow();
+    Preconditions.checkState(parent instanceof Join || parent instanceof SetOp,
+        "Expected parent to be Join/SetOp. Found: %s", parent);
+    HashDistributionDesc hashDistToMatch;
+    if (parent instanceof Join) {
+      Join join = (Join) parent.unwrap();
+      hashDistToMatch = getLeftInputHashDistributionDesc(join)
+          .orElseThrow(() -> new IllegalStateException("Join left input does 
not have hash distribution desc"));
+    } else {
+      SetOp setOp = (SetOp) parent.unwrap();
+      hashDistToMatch = getLeftInputHashDistributionDesc(setOp)
+          .orElseThrow(() -> new IllegalStateException("SetOp left input does 
not have hash distribution desc"));
+    }
     if (assumedDistribution.satisfies(relDistribution)) {
       if (parentDistribution.getWorkers().size() == 
assumedDistribution.getWorkers().size()) {
         List<Integer> distKeys = relDistribution.getKeys();
@@ -392,10 +401,17 @@ public class WorkerExchangeAssignmentRule implements 
PRelNodeTransformer {
     List<Integer> leftKeys = join.analyzeCondition().leftKeys;
     PRelNode asPRelNode = (PRelNode) join;
     return 
asPRelNode.getPRelInput(0).getPinotDataDistributionOrThrow().getHashDistributionDesc().stream()
-        .filter(desc -> desc.getKeys().equals(leftKeys))
+        .filter(desc -> new HashSet<>(desc.getKeys()).equals(new 
HashSet<>(leftKeys)))
         .findFirst();
   }
 
+  private Optional<HashDistributionDesc> 
getLeftInputHashDistributionDesc(SetOp setOp) {
+    PRelNode asPRelNode = (PRelNode) setOp;
+    int numExpectedKeys = setOp.getRowType().getFieldCount();
+    return 
asPRelNode.getPinotDataDistributionOrThrow().getHashDistributionDesc().stream()
+        .filter(desc -> desc.getKeys().size() == numExpectedKeys).findFirst();
+  }
+
   /**
    * Computes the PinotDataDistribution of the given node from the input node. 
This assumes that all traits of the
    * input node are already satisfied.
diff --git 
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json 
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index b669891ce5..ca7dc97307 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -203,6 +203,31 @@
           "\n              PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
+      },
+      {
+        "description": "Same query as above, but union does not have all. 
Union gets decomposed into a union-all and a group-by",
+        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR with teamOne 
as (select /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ col1, 
percentile(col3, 50) as sum_of_runs from a group by col1), teamTwo as (select 
/*+ aggOptions(is_skip_leaf_stage_group_by='true') */ col1, percentile(col3, 
50) as sum_of_runs from a group by col1), all as (select col1, sum_of_runs from 
teamOne union select col1, sum_of_runs from teamTwo) select /*+ 
aggOption(is_skip_leaf_stage_group_by='true') [...]
+        "output": [
+          "Execution Plan",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalAggregate(group=[{0}], agg#0=[PERCENTILE($1, 50)], 
aggType=[FINAL])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
+          "\n      PhysicalAggregate(group=[{0}], agg#0=[PERCENTILE($1, 50)], 
aggType=[LEAF])",
+          "\n        PhysicalProject(col1=[$0], sum_of_runs=[$1], $f2=[50])",
+          "\n          PhysicalAggregate(group=[{0, 1}], aggType=[FINAL])",
+          "\n            
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])",
+          "\n              PhysicalAggregate(group=[{0, 1}], aggType=[LEAF])",
+          "\n                PhysicalUnion(all=[true])",
+          "\n                  PhysicalAggregate(group=[{0}], 
agg#0=[PERCENTILE($1, 50)], aggType=[DIRECT])",
+          "\n                    
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n                      PhysicalProject(col1=[$0], col3=[$2], 
$f2=[50])",
+          "\n                        PhysicalTableScan(table=[[default, a]])",
+          "\n                  PhysicalAggregate(group=[{0}], 
agg#0=[PERCENTILE($1, 50)], aggType=[DIRECT])",
+          "\n                    
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n                      PhysicalProject(col1=[$0], col3=[$2], 
$f2=[50])",
+          "\n                        PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
       }
     ]
   },
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index 236e1e1683..dfb88fe10c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -595,6 +595,8 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
       public boolean _keepOutputRowOrder;
       @JsonProperty("expectedNumSegments")
       public Integer _expectedNumSegments;
+      @JsonProperty("ignoreV2Optimizer")
+      public Boolean _ignoreV2Optimizer = false;
     }
 
     public static class ColumnAndType {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 5210b30b0d..9be04a5b49 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -59,6 +59,8 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -67,6 +69,7 @@ import org.testng.annotations.Test;
 
 
 public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResourceBasedQueriesTest.class);
   private static final ObjectMapper MAPPER = new ObjectMapper();
   private static final Pattern TABLE_NAME_REPLACE_PATTERN = 
Pattern.compile("\\{([\\w\\d]+)\\}");
   private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
@@ -265,7 +268,7 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
   // TODO: name the test using testCaseName for testng reports
   @Test(dataProvider = "testResourceQueryTestCaseProviderInputOnly")
   public void testQueryTestCasesWithH2(String testCaseName, boolean isIgnored, 
String sql, String h2Sql, String expect,
-      boolean keepOutputRowOrder)
+      boolean keepOutputRowOrder, boolean ignoreV2Optimizer)
       throws Exception {
     // query pinot
     runQuery(sql, expect, false).ifPresent(queryResult -> {
@@ -277,6 +280,26 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
     });
   }
 
+  // TODO: name the test using testCaseName for testng reports
+  @Test(dataProvider = "testResourceQueryTestCaseProviderInputOnly")
+  public void testQueryTestCasesWithH2WithNewOptimizer(String testCaseName, 
boolean isIgnored, String sql, String h2Sql,
+      String expect, boolean keepOutputRowOrder, boolean ignoreV2Optimizer)
+      throws Exception {
+    // query pinot
+    if (ignoreV2Optimizer) {
+      LOGGER.warn("Ignoring query for test-case ({}): with v2 optimizer: {}", 
testCaseName, sql);
+      return;
+    }
+    sql = String.format("SET usePhysicalOptimizer=true; %s", sql);
+    runQuery(sql, expect, false).ifPresent(queryResult -> {
+      try {
+        compareRowEquals(queryResult.getResultTable(), queryH2(h2Sql), 
keepOutputRowOrder);
+      } catch (Exception e) {
+        Assert.fail(e.getMessage(), e);
+      }
+    });
+  }
+
   @Test(dataProvider = "testResourceQueryTestCaseProviderBoth")
   public void testQueryTestCasesWithOutput(String testCaseName, boolean 
isIgnored, String sql, String h2Sql,
       List<Object[]> expectedRows, String expect, boolean keepOutputRowOrder)
@@ -489,7 +512,8 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
           String h2Sql = queryCase._h2Sql != null ? 
replaceTableName(testCaseName, queryCase._h2Sql)
               : replaceTableName(testCaseName, queryCase._sql);
           Object[] testEntry = new Object[]{
-              testCaseName, queryCase._ignored, sql, h2Sql, 
queryCase._expectedException, queryCase._keepOutputRowOrder
+              testCaseName, queryCase._ignored, sql, h2Sql, 
queryCase._expectedException, queryCase._keepOutputRowOrder,
+              queryCase._ignoreV2Optimizer
           };
           providerContent.add(testEntry);
         }
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json 
b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index 56a76ab2d0..2af747c270 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -54,12 +54,14 @@
       {
         "description": "Wrong partition key",
         "sql": "SELECT {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM 
{tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='name', 
partition_size='4') */ GROUP BY {tbl1}.num",
-        "expectedException": "Error composing query plan for.*"
+        "expectedException": "Error composing query plan for.*",
+        "ignoreV2Optimizer": true
       },
       {
         "description": "Wrong partition size",
         "sql": "SELECT {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM 
{tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', 
partition_size='2') */ GROUP BY {tbl1}.num",
-        "expectedException": "Error composing query plan for.*"
+        "expectedException": "Error composing query plan for.*",
+        "ignoreV2Optimizer": true
       },
       {
         "description": "Group by partition column",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to