This is an automated email from the ASF dual-hosted git repository.
chrispeck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f444fc2908 [multistage] Fix Bugs in SetOp Handling and Multi-Column
Join (#16330)
f444fc2908 is described below
commit f444fc2908c8879da1894515aec4315ff55d12cd
Author: Ankit Sultana <[email protected]>
AuthorDate: Fri Jul 11 18:02:52 2025 -0500
[multistage] Fix Bugs in SetOp Handling and Multi-Column Join (#16330)
* [multistage] Fix Bugs in SetOp Handling and Multi-Column Join
* fix union-all handling
---
.../pinot/calcite/rel/traits/TraitAssignment.java | 24 ++++++
.../planner/physical/v2/RelToPRelConverter.java | 9 ++-
.../physical/v2/nodes/PhysicalIntersect.java | 86 ++++++++++++++++++++++
.../v2/opt/rules/WorkerExchangeAssignmentRule.java | 26 +++++--
.../resources/queries/PhysicalOptimizerPlans.json | 25 +++++++
.../query/runtime/queries/QueryRunnerTestBase.java | 2 +
.../runtime/queries/ResourceBasedQueriesTest.java | 28 ++++++-
.../src/test/resources/queries/QueryHints.json | 6 +-
8 files changed, 196 insertions(+), 10 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java
index a7ffe7e482..4ce4a1cf53 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java
@@ -33,7 +33,10 @@ import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Union;
import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.query.context.PhysicalPlannerContext;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
@@ -83,10 +86,31 @@ public class TraitAssignment {
return (PRelNode) assignAggregate((PhysicalAggregate) relNode);
} else if (relNode instanceof PhysicalWindow) {
return (PRelNode) assignWindow((PhysicalWindow) relNode);
+ } else if (relNode instanceof SetOp) {
+ return (PRelNode) assignSetOp((SetOp) relNode);
}
return (PRelNode) relNode;
}
+ RelNode assignSetOp(SetOp setOp) {
+ if (setOp instanceof Union) {
+ Union union = (Union) setOp;
+ if (union.all) {
+ // UNION ALL means we can return duplicates, so no trait required.
+ return setOp;
+ }
+ }
+ RelDistribution pushedDownDistTrait =
RelDistributions.hash(ImmutableIntList.range(0,
+ setOp.getRowType().getFieldCount()));
+ List<RelNode> newInputs = new ArrayList<>();
+ for (RelNode input : setOp.getInputs()) {
+ RelTraitSet newTraitSet = input.getTraitSet().plus(pushedDownDistTrait);
+ RelNode newInput = input.copy(newTraitSet, input.getInputs());
+ newInputs.add(newInput);
+ }
+ return setOp.copy(setOp.getTraitSet(), newInputs);
+ }
+
/**
* Sort is always computed by coalescing to a single stream. Hence, we add a
SINGLETON trait to the sort input.
*/
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
index 5e142c2620..09323341b1 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AsofJoin;
import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Intersect;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Minus;
import org.apache.calcite.rel.core.Project;
@@ -41,7 +42,9 @@ import org.apache.pinot.query.context.PhysicalPlannerContext;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAsOfJoin;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalFilter;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalIntersect;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalJoin;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalMinus;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalProject;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalTableScan;
@@ -119,8 +122,12 @@ public class RelToPRelConverter {
nodeIdGenerator.get(), null);
} else if (relNode instanceof Minus) {
Minus minus = (Minus) relNode;
- return new PhysicalUnion(minus.getCluster(), minus.getTraitSet(),
minus.getHints(), minus.all, inputs,
+ return new PhysicalMinus(minus.getCluster(), minus.getTraitSet(),
minus.getHints(), inputs, minus.all,
nodeIdGenerator.get(), null);
+ } else if (relNode instanceof Intersect) {
+ Intersect intersect = (Intersect) relNode;
+ return new PhysicalIntersect(intersect.getCluster(),
intersect.getTraitSet(), intersect.getHints(), inputs,
+ intersect.all, nodeIdGenerator.get(), null);
} else if (relNode instanceof Sort) {
Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input
of sort. Found: %s", inputs);
Sort sort = (Sort) relNode;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalIntersect.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalIntersect.java
new file mode 100644
index 0000000000..f243e5453e
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalIntersect.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.nodes;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.pinot.query.planner.physical.v2.PRelNode;
+import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
+
+
+public class PhysicalIntersect extends Intersect implements PRelNode {
+ private final int _nodeId;
+ private final List<PRelNode> _pRelInputs;
+ @Nullable
+ private final PinotDataDistribution _pinotDataDistribution;
+
+ public PhysicalIntersect(RelOptCluster cluster, RelTraitSet traits,
List<RelHint> hints, List<PRelNode> inputs,
+ boolean all, int nodeId, @Nullable PinotDataDistribution
pinotDataDistribution) {
+ super(cluster, traits, hints,
inputs.stream().map(PRelNode::unwrap).collect(Collectors.toList()), all);
+ _nodeId = nodeId;
+ _pRelInputs = inputs;
+ _pinotDataDistribution = pinotDataDistribution;
+ }
+
+ @Override
+ public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new PhysicalIntersect(
+ getCluster(), traitSet, getHints(),
inputs.stream().map(PRelNode.class::cast).collect(Collectors.toList()),
+ all, _nodeId, _pinotDataDistribution);
+ }
+
+ @Override
+ public int getNodeId() {
+ return _nodeId;
+ }
+
+ @Override
+ public List<PRelNode> getPRelInputs() {
+ return _pRelInputs;
+ }
+
+ @Override
+ public RelNode unwrap() {
+ return this;
+ }
+
+ @Nullable
+ @Override
+ public PinotDataDistribution getPinotDataDistribution() {
+ return _pinotDataDistribution;
+ }
+
+ @Override
+ public boolean isLeafStage() {
+ return false;
+ }
+
+ @Override
+ public PRelNode with(int newNodeId, List<PRelNode> newInputs,
PinotDataDistribution newDistribution) {
+ return new PhysicalIntersect(
+ getCluster(), getTraitSet(), getHints(), newInputs, all, newNodeId,
newDistribution);
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
index 2f338083bd..15cd43c028 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/WorkerExchangeAssignmentRule.java
@@ -37,6 +37,7 @@ import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.Values;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
@@ -334,10 +335,18 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
}
Preconditions.checkState(relDistribution.getType() ==
RelDistribution.Type.HASH_DISTRIBUTED,
"Unexpected distribution constraint: %s", relDistribution.getType());
- Preconditions.checkState(parent instanceof Join, "Expected parent to be
join. Found: %s", parent);
- Join parentJoin = (Join) parent;
- // TODO(mse-physical): add support for sub-partitioning and coalescing
exchange.
- HashDistributionDesc hashDistToMatch =
getLeftInputHashDistributionDesc(parentJoin).orElseThrow();
+ Preconditions.checkState(parent instanceof Join || parent instanceof SetOp,
+ "Expected parent to be Join/SetOp. Found: %s", parent);
+ HashDistributionDesc hashDistToMatch;
+ if (parent instanceof Join) {
+ Join join = (Join) parent.unwrap();
+ hashDistToMatch = getLeftInputHashDistributionDesc(join)
+ .orElseThrow(() -> new IllegalStateException("Join left input does
not have hash distribution desc"));
+ } else {
+ SetOp setOp = (SetOp) parent.unwrap();
+ hashDistToMatch = getLeftInputHashDistributionDesc(setOp)
+ .orElseThrow(() -> new IllegalStateException("SetOp left input does
not have hash distribution desc"));
+ }
if (assumedDistribution.satisfies(relDistribution)) {
if (parentDistribution.getWorkers().size() ==
assumedDistribution.getWorkers().size()) {
List<Integer> distKeys = relDistribution.getKeys();
@@ -392,10 +401,17 @@ public class WorkerExchangeAssignmentRule implements
PRelNodeTransformer {
List<Integer> leftKeys = join.analyzeCondition().leftKeys;
PRelNode asPRelNode = (PRelNode) join;
return
asPRelNode.getPRelInput(0).getPinotDataDistributionOrThrow().getHashDistributionDesc().stream()
- .filter(desc -> desc.getKeys().equals(leftKeys))
+ .filter(desc -> new HashSet<>(desc.getKeys()).equals(new
HashSet<>(leftKeys)))
.findFirst();
}
+ private Optional<HashDistributionDesc>
getLeftInputHashDistributionDesc(SetOp setOp) {
+ PRelNode asPRelNode = (PRelNode) setOp;
+ int numExpectedKeys = setOp.getRowType().getFieldCount();
+ return
asPRelNode.getPinotDataDistributionOrThrow().getHashDistributionDesc().stream()
+ .filter(desc -> desc.getKeys().size() == numExpectedKeys).findFirst();
+ }
+
/**
* Computes the PinotDataDistribution of the given node from the input node.
This assumes that all traits of the
* input node are already satisfied.
diff --git
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index b669891ce5..ca7dc97307 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -203,6 +203,31 @@
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
+ },
+ {
+ "description": "Same query as above, but union does not have all.
Union gets decomposed into a union-all and a group-by",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR with teamOne
as (select /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ col1,
percentile(col3, 50) as sum_of_runs from a group by col1), teamTwo as (select
/*+ aggOptions(is_skip_leaf_stage_group_by='true') */ col1, percentile(col3,
50) as sum_of_runs from a group by col1), all as (select col1, sum_of_runs from
teamOne union select col1, sum_of_runs from teamTwo) select /*+
aggOption(is_skip_leaf_stage_group_by='true') [...]
+ "output": [
+ "Execution Plan",
+ "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[PERCENTILE($1, 50)],
aggType=[FINAL])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[0]])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[PERCENTILE($1, 50)],
aggType=[LEAF])",
+ "\n PhysicalProject(col1=[$0], sum_of_runs=[$1], $f2=[50])",
+ "\n PhysicalAggregate(group=[{0, 1}], aggType=[FINAL])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])",
+ "\n PhysicalAggregate(group=[{0, 1}], aggType=[LEAF])",
+ "\n PhysicalUnion(all=[true])",
+ "\n PhysicalAggregate(group=[{0}],
agg#0=[PERCENTILE($1, 50)], aggType=[DIRECT])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+ "\n PhysicalProject(col1=[$0], col3=[$2],
$f2=[50])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n PhysicalAggregate(group=[{0}],
agg#0=[PERCENTILE($1, 50)], aggType=[DIRECT])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+ "\n PhysicalProject(col1=[$0], col3=[$2],
$f2=[50])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
}
]
},
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index 236e1e1683..dfb88fe10c 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -595,6 +595,8 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
public boolean _keepOutputRowOrder;
@JsonProperty("expectedNumSegments")
public Integer _expectedNumSegments;
+ @JsonProperty("ignoreV2Optimizer")
+ public Boolean _ignoreV2Optimizer = false;
}
public static class ColumnAndType {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 5210b30b0d..9be04a5b49 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -59,6 +59,8 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -67,6 +69,7 @@ import org.testng.annotations.Test;
public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ResourceBasedQueriesTest.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final Pattern TABLE_NAME_REPLACE_PATTERN =
Pattern.compile("\\{([\\w\\d]+)\\}");
private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
@@ -265,7 +268,7 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
// TODO: name the test using testCaseName for testng reports
@Test(dataProvider = "testResourceQueryTestCaseProviderInputOnly")
public void testQueryTestCasesWithH2(String testCaseName, boolean isIgnored,
String sql, String h2Sql, String expect,
- boolean keepOutputRowOrder)
+ boolean keepOutputRowOrder, boolean ignoreV2Optimizer)
throws Exception {
// query pinot
runQuery(sql, expect, false).ifPresent(queryResult -> {
@@ -277,6 +280,26 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
});
}
+ // TODO: name the test using testCaseName for testng reports
+ @Test(dataProvider = "testResourceQueryTestCaseProviderInputOnly")
+ public void testQueryTestCasesWithH2WithNewOptimizer(String testCaseName,
boolean isIgnored, String sql, String h2Sql,
+ String expect, boolean keepOutputRowOrder, boolean ignoreV2Optimizer)
+ throws Exception {
+ // query pinot
+ if (ignoreV2Optimizer) {
+ LOGGER.warn("Ignoring query for test-case ({}): with v2 optimizer: {}",
testCaseName, sql);
+ return;
+ }
+ sql = String.format("SET usePhysicalOptimizer=true; %s", sql);
+ runQuery(sql, expect, false).ifPresent(queryResult -> {
+ try {
+ compareRowEquals(queryResult.getResultTable(), queryH2(h2Sql),
keepOutputRowOrder);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage(), e);
+ }
+ });
+ }
+
@Test(dataProvider = "testResourceQueryTestCaseProviderBoth")
public void testQueryTestCasesWithOutput(String testCaseName, boolean
isIgnored, String sql, String h2Sql,
List<Object[]> expectedRows, String expect, boolean keepOutputRowOrder)
@@ -489,7 +512,8 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
String h2Sql = queryCase._h2Sql != null ?
replaceTableName(testCaseName, queryCase._h2Sql)
: replaceTableName(testCaseName, queryCase._sql);
Object[] testEntry = new Object[]{
- testCaseName, queryCase._ignored, sql, h2Sql,
queryCase._expectedException, queryCase._keepOutputRowOrder
+ testCaseName, queryCase._ignored, sql, h2Sql,
queryCase._expectedException, queryCase._keepOutputRowOrder,
+ queryCase._ignoreV2Optimizer
};
providerContent.add(testEntry);
}
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index 56a76ab2d0..2af747c270 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -54,12 +54,14 @@
{
"description": "Wrong partition key",
"sql": "SELECT {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM
{tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='name',
partition_size='4') */ GROUP BY {tbl1}.num",
- "expectedException": "Error composing query plan for.*"
+ "expectedException": "Error composing query plan for.*",
+ "ignoreV2Optimizer": true
},
{
"description": "Wrong partition size",
"sql": "SELECT {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM
{tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num',
partition_size='2') */ GROUP BY {tbl1}.num",
- "expectedException": "Error composing query plan for.*"
+ "expectedException": "Error composing query plan for.*",
+ "ignoreV2Optimizer": true
},
{
"description": "Group by partition column",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]