This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6ad08a7016 Fix colocated join when no mapping project is used (#13666)
6ad08a7016 is described below
commit 6ad08a7016a19c34e2cce2c0834ac38b451798e4
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Jul 30 11:32:48 2024 +0200
Fix colocated join when no mapping project is used (#13666)
---
.../rel/rules/PinotRelDistributionTraitRule.java | 21 ++++++++-
.../resources/queries/ExplainPhysicalPlans.json | 52 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 1 deletion(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
index 821d1cf5ad..8fbd8da202 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
@@ -29,17 +29,23 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.mapping.IntPair;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.calcite.util.mapping.MappingType;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -52,6 +58,7 @@ import org.apache.pinot.query.planner.plannode.AggregateNode;
public class PinotRelDistributionTraitRule extends RelOptRule {
public static final PinotRelDistributionTraitRule INSTANCE =
new PinotRelDistributionTraitRule(PinotRuleUtils.PINOT_REL_FACTORY);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PinotRelDistributionTraitRule.class);
public PinotRelDistributionTraitRule(RelBuilderFactory factory) {
super(operand(RelNode.class, any()));
@@ -110,10 +117,22 @@ public class PinotRelDistributionTraitRule extends
RelOptRule {
LogicalProject project = (LogicalProject) node;
try {
if (inputRelDistribution != null) {
- return inputRelDistribution.apply(project.getMapping());
+ Mappings.TargetMapping mapping =
+ Project.getPartialMapping(input.getRowType().getFieldCount(),
project.getProjects());
+ // Note(gonzalo): In Calcite 1.37 mapping.getTargetOpt will fail in
what it looks like a Calcite bug.
+ // Therefore here we apply a workaround and create a new map where
the same elements (extracted with
+ // iterator, which actually work) are added to the new mapping.
+ // See
https://lists.apache.org/thread/qz18qxrfp5bqldnoln2tg4582g402zyv
+ Mapping actualMapping =
Mappings.create(MappingType.PARTIAL_FUNCTION,
input.getRowType().getFieldCount(),
+ project.getRowType().getFieldCount());
+ for (IntPair intPair : mapping) {
+ actualMapping.set(intPair.source, intPair.target);
+ }
+ return inputRelDistribution.apply(actualMapping);
}
} catch (Exception e) {
// ... skip;
+ LOGGER.warn("Failed to derive distribution from input for node: {}",
node, e);
}
} else if (node instanceof LogicalFilter) {
assert inputs.size() == 1;
diff --git
a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
index f72f8c1852..31db5ee99b 100644
--- a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
@@ -449,6 +449,58 @@
" └── [5]@localhost:1|[0] TABLE
SCAN (b) null\n",
""
]
+ },
+ {
+ "description": "explain plan with colocated join and a projection that
is not mapping",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM
(select col2, col3 + col2 as mySum from a /*+
tableOptions(partition_function='hashcode', partition_key='col2',
partition_size='4') */) as a JOIN b /*+
tableOptions(partition_function='hashcode', partition_key='col1',
partition_size='4') */ ON a.col2 = b.col1",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:2|[2]
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "├── [1]@localhost:2|[3]
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "├── [1]@localhost:1|[0]
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:1|[1]
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:1|[1] PROJECT\n",
+ " └── [1]@localhost:1|[1] JOIN\n",
+ " ├── [1]@localhost:1|[1]
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " │ ├── [2]@localhost:2|[2]
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree
Omitted)\n",
+ " │ ├── [2]@localhost:2|[3]
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree
Omitted)\n",
+ " │ ├── [2]@localhost:1|[0]
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree
Omitted)\n",
+ " │ └── [2]@localhost:1|[1]
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
+ " │ └── [2]@localhost:1|[1] PROJECT\n",
+ " │ └── [2]@localhost:1|[1] TABLE SCAN (a)
null\n",
+ " └── [1]@localhost:1|[1]
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [3]@localhost:2|[2]
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree
Omitted)\n",
+ " ├── [3]@localhost:2|[3]
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree
Omitted)\n",
+ " ├── [3]@localhost:1|[0]
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree
Omitted)\n",
+ " └── [3]@localhost:1|[1]
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
+ " └── [3]@localhost:1|[1] PROJECT\n",
+ " └── [3]@localhost:1|[1] TABLE SCAN (b)
null\n"
+ ]
+ },
+ {
+ "description": "explain plan with colocated join and a projection that
doesn't keep the key column",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM
(select col3 as col2, col3 + col2 as mySum from a /*+
tableOptions(partition_function='hashcode', partition_key='col2',
partition_size='4') */) as a JOIN b /*+
tableOptions(partition_function='hashcode', partition_key='col1',
partition_size='4') */ ON a.col2 = b.col1",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1]
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0]
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] PROJECT\n",
+ " └── [1]@localhost:2|[0] JOIN\n",
+ " ├── [1]@localhost:2|[0]
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " │ ├── [2]@localhost:2|[2]
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree
Omitted)\n",
+ " │ ├── [2]@localhost:2|[3]
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree
Omitted)\n",
+ " │ ├── [2]@localhost:1|[0]
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree
Omitted)\n",
+ " │ └── [2]@localhost:1|[1]
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " │ └── [2]@localhost:1|[1] PROJECT\n",
+ " │ └── [2]@localhost:1|[1] TABLE SCAN (a)
null\n",
+ " └── [1]@localhost:2|[0]
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [3]@localhost:2|[2]
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree
Omitted)\n",
+ " ├── [3]@localhost:2|[3]
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree
Omitted)\n",
+ " ├── [3]@localhost:1|[0]
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree
Omitted)\n",
+ " └── [3]@localhost:1|[1]
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " └── [3]@localhost:1|[1] PROJECT\n",
+ " └── [3]@localhost:1|[1] TABLE SCAN (b)
null\n"
+ ]
}
]
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]