This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 422006a8090bb7ee4576f4dbb2d6cbd3152ffded Author: Tamas Mate <[email protected]> AuthorDate: Mon Feb 27 10:05:51 2023 +0100 IMPALA-11950: Planner change for Iceberg metadata querying This commit extends the planner with IcebergMetadataScanNode which will be used to scan Iceberg metadata tables (IMPALA-10947). The scan node is only implemented on the frontend side in this patch, the backend part will be developed in IMPALA-11996. To avoid executing the plan there is a hardcoded condition, it is after the explain part, so the change remains testable with EXPLAIN queries. Testing: - Added planner tests Change-Id: I3675d7a57ca570bfec306798589b5ef6aa34b5c6 Reviewed-on: http://gerrit.cloudera.org:8080/19547 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../catalog/iceberg/IcebergMetadataTable.java | 20 +++- .../apache/impala/planner/DistributedPlanner.java | 10 ++ .../impala/planner/IcebergMetadataScanNode.java | 78 ++++++++++++++ .../java/org/apache/impala/planner/ScanNode.java | 6 +- .../apache/impala/planner/SingleNodePlanner.java | 13 ++- .../java/org/apache/impala/service/Frontend.java | 14 ++- .../org/apache/impala/planner/PlannerTest.java | 9 ++ .../PlannerTest/iceberg-metadata-table-scan.test | 114 +++++++++++++++++++++ 8 files changed, 251 insertions(+), 13 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java index 42756c62c..b7d0dcbe0 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java @@ -25,6 +25,7 @@ import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.impala.analysis.TableName; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.FeCatalogUtils; import org.apache.impala.catalog.FeIcebergTable; @@ -45,12 +46,14 @@ import com.google.common.base.Preconditions; */ public class IcebergMetadataTable extends VirtualTable { private FeIcebergTable baseTable_; + private String metadataTableName_; public IcebergMetadataTable(FeTable baseTable, String metadataTableTypeStr) throws ImpalaRuntimeException { super(null, baseTable.getDb(), baseTable.getName(), baseTable.getOwnerUser()); Preconditions.checkArgument(baseTable instanceof FeIcebergTable); baseTable_ = (FeIcebergTable) baseTable; + metadataTableName_ = metadataTableTypeStr; MetadataTableType type = MetadataTableType.from(metadataTableTypeStr.toUpperCase()); Preconditions.checkNotNull(type); Table metadataTable = MetadataTableUtils.createMetadataTableInstance( @@ -71,6 +74,16 @@ public class IcebergMetadataTable extends VirtualTable { return baseTable_; } + @Override + public String getFullName() { + return super.getFullName() + "." + metadataTableName_; + } + + @Override + public TableName getTableName() { + return new TableName(db_.getName(), name_, metadataTableName_); + } + @Override public TTableStats getTTableStats() { long totalBytes = 0; @@ -79,9 +92,14 @@ public class IcebergMetadataTable extends VirtualTable { return ret; } + @Override + public org.apache.hadoop.hive.metastore.api.Table getMetaStoreTable() { + return baseTable_.getMetaStoreTable(); + } + /** * Return same descriptor as the base table, but with a schema that corresponds to - * the metadtata table schema. + * the metadata table schema. */ @Override public TTableDescriptor toThriftDescriptor(int tableId, diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 67a376a7e..56e54a7b2 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -147,6 +147,9 @@ public class DistributedPlanner { ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED); } else if (root instanceof CardinalityCheckNode) { result = createCardinalityCheckNodeFragment((CardinalityCheckNode) root, childFragments); + } else if (root instanceof IcebergMetadataScanNode) { + result = createIcebergMetadataScanFragment(root); + fragments.add(result); } else { throw new InternalException("Cannot create plan fragment for this node type: " + root.getExplainString(ctx_.getQueryOptions())); @@ -325,6 +328,13 @@ public class DistributedPlanner { return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.RANDOM); } + /** + * Create an Iceberg Metadata scan fragment. + */ + private PlanFragment createIcebergMetadataScanFragment(PlanNode node) { + return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED); + } + /** * Adds the SubplanNode as the new plan root to the child fragment and returns * the child fragment. diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java new file mode 100644 index 000000000..4a776786e --- /dev/null +++ b/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.planner; + +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.TupleDescriptor; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.thrift.TExplainLevel; +import org.apache.impala.thrift.TPlanNode; +import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.thrift.TScanRangeSpec; + +import com.google.common.base.Preconditions; + +public class IcebergMetadataScanNode extends ScanNode { + + protected IcebergMetadataScanNode(PlanNodeId id, TupleDescriptor desc) { + super(id, desc, "SCAN ICEBERG METADATA"); + } + + @Override + public void init(Analyzer analyzer) throws ImpalaException { + super.init(analyzer); + scanRangeSpecs_ = new TScanRangeSpec(); + computeMemLayout(analyzer); + computeStats(analyzer); + } + + @Override + protected String getDisplayLabelDetail() { + Preconditions.checkNotNull(desc_.getPath()); + if (desc_.hasExplicitAlias()) { + return desc_.getPath().toString() + " " + desc_.getAlias(); + } else { + return desc_.getPath().toString(); + } + } + + @Override + protected String getNodeExplainString(String rootPrefix, String detailPrefix, + TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + output.append(String.format("%s%s [%s]\n", rootPrefix, getDisplayLabel(), + getDisplayLabelDetail())); + return output.toString(); + } + + @Override + protected void toThrift(TPlanNode msg) { + // Implement for fragment execution + } + + @Override + public void computeNodeResourceProfile(TQueryOptions queryOptions) { + nodeResourceProfile_ = ResourceProfile.noReservation(0); + } + + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = computeDefaultProcessingCost(); + } + +} diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java index 59ebb179c..d9b3ac49b 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java @@ -310,10 +310,6 @@ abstract public class ScanNode extends PlanNode { @Override protected String getDisplayLabelDetail() { - FeTable table = desc_.getTable(); - List<String> path = new ArrayList<>(); - path.add(table.getDb().getName()); - path.add(table.getName()); Preconditions.checkNotNull(desc_.getPath()); if (desc_.hasExplicitAlias()) { return desc_.getPath().toString() + " " + desc_.getAlias(); @@ -391,7 +387,7 @@ abstract public class ScanNode extends PlanNode { /** * Estimate per-row cost as 1 per 1KB row size. * <p> - * This reflect deserialization/copy cost per row. + * This reflects deserialization/copy cost per row. */ private float rowMaterializationCost() { return getAvgRowSize() / 1024; } diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 388d08094..71c7af2e5 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -863,7 +863,6 @@ public class SingleNodePlanner { // If the collection is within a (possibly nested) struct, add the tuple in // which the top level struct is located. SlotDescriptor desc = collectionExpr.getDesc(); - List<TupleDescriptor> enclosingTupleDescs = desc.getEnclosingTupleDescs(); TupleDescriptor topTuple = desc.getTopEnclosingTupleDesc(); requiredTids.add(topTuple.getId()); } else { @@ -1900,6 +1899,14 @@ public class SingleNodePlanner { } } + private PlanNode createIcebergMetadataScanNode(TableRef tblRef, Analyzer analyzer) + throws ImpalaException { + IcebergMetadataScanNode icebergMetadataScanNode = + new IcebergMetadataScanNode(ctx_.getNextNodeId(), tblRef.getDesc()); + icebergMetadataScanNode.init(analyzer); + return icebergMetadataScanNode; + } + /** * Returns all applicable conjuncts for join between two plan trees 'materializing' the * given left-hand and right-hand side table ref ids. The conjuncts either come from @@ -2213,9 +2220,7 @@ public class SingleNodePlanner { result = new SingularRowSrcNode(ctx_.getNextNodeId(), ctx_.getSubplan()); result.init(analyzer); } else if (tblRef instanceof IcebergMetadataTableRef) { - throw new NotImplementedException(String.format("'%s' refers to a metadata table " - + "which is currently not supported.", String.join(".", - tblRef.getPath()))); + result = createIcebergMetadataScanNode(tblRef, analyzer); } else { throw new NotImplementedException( "Planning not implemented for table ref class: " + tblRef.getClass()); diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index d51e34897..8829beab9 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -124,6 +124,7 @@ import org.apache.impala.catalog.ImpaladTableUsageTracker; import org.apache.impala.catalog.MaterializedViewHdfsTable; import org.apache.impala.catalog.MetaStoreClientPool; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; +import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.local.InconsistentMetadataFetchException; @@ -290,9 +291,8 @@ public class Frontend { protected long estimated_memory_per_host_ = -1; // The processing cores required to execute the query. - // Certain queries such as EXPLAIN that do not populate - // TExecRequest.query_exec_request. Therefore, cores requirement will be set here - // through setCoresRequired(). + // Certain queries such as EXPLAIN do not populate TExecRequest.query_exec_request. + // Therefore, cores requirement will be set here through setCoresRequired(). protected int cores_required_ = -1; // The initial length of content in explain buffer to help return the buffer @@ -2470,6 +2470,14 @@ public class Frontend { return result; } + // Blocking query execution for queries that contain IcebergMetadataTables + for (FeTable table : stmtTableCache.tables.values()) { + if (table instanceof IcebergMetadataTable) { + throw new NotImplementedException(String.format("'%s' refers to a metadata " + + "table which is currently not supported.", table.getFullName())); + } + } + result.setQuery_exec_request(queryExecRequest); if (analysisResult.isQueryStmt()) { result.stmt_type = TStmtType.QUERY; diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 6b4eba927..a3b91eaf5 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -1282,6 +1282,15 @@ public class PlannerTest extends PlannerTestBase { ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); } + /** + * Check that Iceberg metadata table scan plans are as expected. + */ + @Test + public void testIcebergMetadataTableScans() { + runPlannerTestFile("iceberg-metadata-table-scan", "functional_parquet", + ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); + } + /** * Test limit pushdown into analytic sort in isolation. */ diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test new file mode 100644 index 000000000..f976cb75e --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test @@ -0,0 +1,114 @@ +explain SELECT * FROM functional_parquet.iceberg_alltypes_part_orc.history +---- PLAN +PLAN-ROOT SINK +| +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history] + row-size=33B cardinality=unavailable +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history] + row-size=33B cardinality=unavailable +==== +explain select * +from functional_parquet.iceberg_alltypes_part_orc.history q + join functional_parquet.iceberg_alltypes_part_orc.history z + on z.snapshot_id = q.snapshot_id +---- PLAN +PLAN-ROOT SINK +| +02:NESTED LOOP JOIN [CROSS JOIN] +| row-size=66B cardinality=unavailable +| +|--01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history z] +| row-size=33B cardinality=unavailable +| +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history q] + row-size=33B cardinality=unavailable +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +04:EXCHANGE [UNPARTITIONED] +| +02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST] +| row-size=66B cardinality=unavailable +| +|--03:EXCHANGE [BROADCAST] +| | +| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history z] +| row-size=33B cardinality=unavailable +| +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history q] + row-size=33B cardinality=unavailable +==== +explain select q.snapshot_id, z.made_current_at as test1, z.made_current_at as test2 +from functional_parquet.iceberg_alltypes_part_orc.history q + join functional_parquet.iceberg_alltypes_part_orc.history z + on z.snapshot_id = q.snapshot_id +---- PLAN +PLAN-ROOT SINK +| +02:NESTED LOOP JOIN [CROSS JOIN] +| row-size=24B cardinality=unavailable +| +|--01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history z] +| row-size=16B cardinality=unavailable +| +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history q] + row-size=8B cardinality=unavailable +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +04:EXCHANGE [UNPARTITIONED] +| +02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST] +| row-size=24B cardinality=unavailable +| +|--03:EXCHANGE [BROADCAST] +| | +| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history z] +| row-size=16B cardinality=unavailable +| +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history q] + row-size=8B cardinality=unavailable +==== +explain select * from functional_parquet.iceberg_alltypes_part_orc.manifests a, a.partition_summaries +---- PLAN +PLAN-ROOT SINK +| +01:SUBPLAN +| row-size=98B cardinality=unavailable +| +|--04:NESTED LOOP JOIN [CROSS JOIN] +| | row-size=98B cardinality=10 +| | +| |--02:SINGULAR ROW SRC +| | row-size=72B cardinality=1 +| | +| 03:UNNEST [a.partition_summaries] +| row-size=0B cardinality=10 +| +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.manifests a] + row-size=72B cardinality=unavailable +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +05:EXCHANGE [UNPARTITIONED] +| +01:SUBPLAN +| row-size=98B cardinality=unavailable +| +|--04:NESTED LOOP JOIN [CROSS JOIN] +| | row-size=98B cardinality=10 +| | +| |--02:SINGULAR ROW SRC +| | row-size=72B cardinality=1 +| | +| 03:UNNEST [a.partition_summaries] +| row-size=0B cardinality=10 +| +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.manifests a] + row-size=72B cardinality=unavailable +==== \ No newline at end of file
