This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 37a8007df0366a8e81ca8ad0f85919c3ba78fba2 Author: Steve Carlin <[email protected]> AuthorDate: Fri Sep 12 17:18:15 2025 -0700 IMPALA-14434: Calcite planner: implement partition key scan optimization Implemented the partition key scan optimization for Calcite planner Most of the code was already in place. Just needed to refactor some code in SingleNodePlanner to make it callable from Calcite and use the already created isPartitionKeyScan method in ImpalaHdfsScanRel. Testing was done by running the Impala e2e tests with the use_calcite_planner flag set to true. Change-Id: I7b5b8a8115f65f6be27a5be0e19f21eebab61a32 Reviewed-on: http://gerrit.cloudera.org:8080/23691 Reviewed-by: Joe McDonnell <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Steve Carlin <[email protected]> --- .../apache/impala/planner/SingleNodePlanner.java | 61 ++++++++++++---------- .../impala/calcite/rel/node/ImpalaHdfsScanRel.java | 14 +++-- .../queries/QueryTest/calcite.test | 20 +++++++ 3 files changed, 65 insertions(+), 30 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 9fbed2c18..bf70f0b37 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -1646,33 +1646,8 @@ public class SingleNodePlanner implements SingleNodePlannerIntf { // try evaluating with metadata first. If not, fall back to scanning. TQueryOptions queryOpts = analyzer.getQueryCtx().client_request.query_options; if (isPartitionKeyScan && queryOpts.optimize_partition_key_scans) { - Set<List<Expr>> uniqueExprs = new HashSet<>(); - - for (FeFsPartition partition : partitions) { - // Ignore empty partitions to match the behavior of the scan based approach. - if (partition.getSize() == 0) continue; - List<Expr> exprs = new ArrayList<>(); - for (SlotDescriptor slotDesc: tupleDesc.getSlots()) { - // UnionNode.init() will go through all the slots in the tuple descriptor so - // there needs to be an entry in 'exprs' for each slot. For unmaterialized - // slots, use dummy null values. UnionNode will filter out unmaterialized slots. - if (!slotDesc.isMaterialized()) { - exprs.add(NullLiteral.create(slotDesc.getType())); - } else { - int pos = slotDesc.getColumn().getPosition(); - exprs.add(partition.getPartitionValue(pos)); - } - } - uniqueExprs.add(exprs); - } - - // Create a UNION node with all unique partition keys. - UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), tupleDesc.getId()); - for (List<Expr> exprList: uniqueExprs) { - unionNode.addConstExprList(exprList); - } - unionNode.init(analyzer); - return unionNode; + return createOptimizedPartitionUnionNode(ctx_.getNextNodeId(), partitions, + tupleDesc, analyzer); } else if (addAcidSlotsIfNeeded(analyzer, hdfsTblRef, partitions)) { // We are scanning a full ACID table that has delete delta files. Let's create // a LEFT ANTI JOIN between the insert deltas and delete deltas. @@ -1714,6 +1689,38 @@ public class SingleNodePlanner implements SingleNodePlannerIntf { return true; } + public static PlanNode createOptimizedPartitionUnionNode(PlanNodeId nodeId, + List<? extends FeFsPartition> partitions, TupleDescriptor tupleDesc, + Analyzer analyzer) { + Set<List<Expr>> uniqueExprs = new HashSet<>(); + + for (FeFsPartition partition : partitions) { + // Ignore empty partitions to match the behavior of the scan based approach. + if (partition.getSize() == 0) continue; + List<Expr> exprs = new ArrayList<>(); + for (SlotDescriptor slotDesc: tupleDesc.getSlots()) { + // UnionNode.init() will go through all the slots in the tuple descriptor so + // there needs to be an entry in 'exprs' for each slot. For unmaterialized + // slots, use dummy null values. UnionNode will filter out unmaterialized slots. + if (!slotDesc.isMaterialized()) { + exprs.add(NullLiteral.create(slotDesc.getType())); + } else { + int pos = slotDesc.getColumn().getPosition(); + exprs.add(partition.getPartitionValue(pos)); + } + } + uniqueExprs.add(exprs); + } + + // Create a UNION node with all unique partition keys. + UnionNode unionNode = new UnionNode(nodeId, tupleDesc.getId()); + for (List<Expr> exprList: uniqueExprs) { + unionNode.addConstExprList(exprList); + } + unionNode.init(analyzer); + return unionNode; + } + /* Purposefully made 'public' for third party usage.*/ public static void addAcidSlots(Analyzer analyzer, TableRef hdfsTblRef) throws AnalysisException { diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java index a9b239f95..719b258df 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java @@ -115,9 +115,17 @@ public class ImpalaHdfsScanRel extends TableScan baseTblRef, filterConjuncts, impalaPartitions, partitionConjuncts, context.ctx_); } else { - physicalNode = new ImpalaHdfsScanNode(nodeId, tupleDesc, impalaPartitions, - baseTblRef, null, partitionConjuncts, filterConjuncts, countStarDesc, - isPartitionScanOnly(context, table)); + boolean isPartitionScanOnly = isPartitionScanOnly(context, table); + if (isPartitionScanOnly && + context.ctx_.getQueryOptions().optimize_partition_key_scans) { + physicalNode = + SingleNodePlanner.createOptimizedPartitionUnionNode(nodeId, impalaPartitions, + tupleDesc, analyzer); + } else { + physicalNode = new ImpalaHdfsScanNode(nodeId, tupleDesc, impalaPartitions, + baseTblRef, null, partitionConjuncts, filterConjuncts, countStarDesc, + isPartitionScanOnly(context, table)); + } } physicalNode.setOutputSmap(new ExprSubstitutionMap()); physicalNode.init(analyzer); diff --git a/testdata/workloads/functional-query/queries/QueryTest/calcite.test b/testdata/workloads/functional-query/queries/QueryTest/calcite.test index e90c9b1a7..5c49f1bc5 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/calcite.test +++ b/testdata/workloads/functional-query/queries/QueryTest/calcite.test @@ -1192,3 +1192,23 @@ select '\"'; ---- RESULTS '"' ==== +---- QUERY +# IMPALA-3334: 'optimize_partition_key_scans' is a boolean query option +set explain_level=0; +set optimize_partition_key_scans=true; +explain select min(month), max(year), ndv(day) from functional.alltypesagg; +---- RESULTS: VERIFY_IS_SUBSET +'01:AGGREGATE [FINALIZE]' +'00:UNION' +' constant-operands=11' +==== +---- QUERY +set explain_level=0; +set optimize_partition_key_scans=false; +explain select min(month), max(year), ndv(day) from functional.alltypesagg; +---- RESULTS: VERIFY_IS_SUBSET +'03:AGGREGATE [FINALIZE]' +'02:EXCHANGE [UNPARTITIONED]' +'01:AGGREGATE' +'00:SCAN $FILESYSTEM_NAME [functional.alltypesagg]' +====
