This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 422006a8090bb7ee4576f4dbb2d6cbd3152ffded
Author: Tamas Mate <[email protected]>
AuthorDate: Mon Feb 27 10:05:51 2023 +0100

    IMPALA-11950: Planner change for Iceberg metadata querying
    
    This commit extends the planner with IcebergMetadataScanNode which will
    be used to scan Iceberg metadata tables (IMPALA-10947). The scan node is
    only implemented on the frontend side in this patch, the backend part
    will be developed in IMPALA-11996.
    
    To avoid executing the plan there is a hardcoded condition, it is after
    the explain part, so the change remains testable with EXPLAIN queries.
    
    Testing:
     - Added planner tests
    
    Change-Id: I3675d7a57ca570bfec306798589b5ef6aa34b5c6
    Reviewed-on: http://gerrit.cloudera.org:8080/19547
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../catalog/iceberg/IcebergMetadataTable.java      |  20 +++-
 .../apache/impala/planner/DistributedPlanner.java  |  10 ++
 .../impala/planner/IcebergMetadataScanNode.java    |  78 ++++++++++++++
 .../java/org/apache/impala/planner/ScanNode.java   |   6 +-
 .../apache/impala/planner/SingleNodePlanner.java   |  13 ++-
 .../java/org/apache/impala/service/Frontend.java   |  14 ++-
 .../org/apache/impala/planner/PlannerTest.java     |   9 ++
 .../PlannerTest/iceberg-metadata-table-scan.test   | 114 +++++++++++++++++++++
 8 files changed, 251 insertions(+), 13 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
index 42756c62c..b7d0dcbe0 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.impala.analysis.TableName;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeIcebergTable;
@@ -45,12 +46,14 @@ import com.google.common.base.Preconditions;
  */
 public class IcebergMetadataTable extends VirtualTable {
   private FeIcebergTable baseTable_;
+  private String metadataTableName_;
 
   public IcebergMetadataTable(FeTable baseTable, String metadataTableTypeStr)
       throws ImpalaRuntimeException {
     super(null, baseTable.getDb(), baseTable.getName(), 
baseTable.getOwnerUser());
     Preconditions.checkArgument(baseTable instanceof FeIcebergTable);
     baseTable_ = (FeIcebergTable) baseTable;
+    metadataTableName_ = metadataTableTypeStr;
     MetadataTableType type = 
MetadataTableType.from(metadataTableTypeStr.toUpperCase());
     Preconditions.checkNotNull(type);
     Table metadataTable = MetadataTableUtils.createMetadataTableInstance(
@@ -71,6 +74,16 @@ public class IcebergMetadataTable extends VirtualTable {
     return baseTable_;
   }
 
+  @Override
+  public String getFullName() {
+    return super.getFullName() + "." + metadataTableName_;
+  }
+
+  @Override
+  public TableName getTableName() {
+    return new TableName(db_.getName(), name_, metadataTableName_);
+  }
+
   @Override
   public TTableStats getTTableStats() {
     long totalBytes = 0;
@@ -79,9 +92,14 @@ public class IcebergMetadataTable extends VirtualTable {
     return ret;
   }
 
+  @Override
+  public org.apache.hadoop.hive.metastore.api.Table getMetaStoreTable() {
+    return baseTable_.getMetaStoreTable();
+  }
+
   /**
    * Return same descriptor as the base table, but with a schema that 
corresponds to
-   * the metadtata table schema.
+   * the metadata table schema.
    */
   @Override
   public TTableDescriptor toThriftDescriptor(int tableId,
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 67a376a7e..56e54a7b2 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -147,6 +147,9 @@ public class DistributedPlanner {
           ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED);
     } else if (root instanceof CardinalityCheckNode) {
       result = createCardinalityCheckNodeFragment((CardinalityCheckNode) root, 
childFragments);
+    } else if (root instanceof IcebergMetadataScanNode) {
+      result = createIcebergMetadataScanFragment(root);
+      fragments.add(result);
     } else {
       throw new InternalException("Cannot create plan fragment for this node 
type: "
           + root.getExplainString(ctx_.getQueryOptions()));
@@ -325,6 +328,13 @@ public class DistributedPlanner {
     return new PlanFragment(ctx_.getNextFragmentId(), node, 
DataPartition.RANDOM);
   }
 
+  /**
+   * Create an Iceberg Metadata scan fragment.
+   */
+  private PlanFragment createIcebergMetadataScanFragment(PlanNode node) {
+    return new PlanFragment(ctx_.getNextFragmentId(), node, 
DataPartition.UNPARTITIONED);
+  }
+
   /**
    * Adds the SubplanNode as the new plan root to the child fragment and 
returns
    * the child fragment.
diff --git 
a/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java
new file mode 100644
index 000000000..4a776786e
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TPlanNode;
+import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.thrift.TScanRangeSpec;
+
+import com.google.common.base.Preconditions;
+
+public class IcebergMetadataScanNode extends ScanNode {
+
+  protected IcebergMetadataScanNode(PlanNodeId id, TupleDescriptor desc) {
+    super(id, desc, "SCAN ICEBERG METADATA");
+  }
+
+  @Override
+  public void init(Analyzer analyzer) throws ImpalaException {
+    super.init(analyzer);
+    scanRangeSpecs_ = new TScanRangeSpec();
+    computeMemLayout(analyzer);
+    computeStats(analyzer);
+  }
+
+  @Override
+  protected String getDisplayLabelDetail() {
+    Preconditions.checkNotNull(desc_.getPath());
+    if (desc_.hasExplicitAlias()) {
+      return desc_.getPath().toString() + " " + desc_.getAlias();
+    } else {
+      return desc_.getPath().toString();
+    }
+  }
+
+  @Override
+  protected String getNodeExplainString(String rootPrefix, String detailPrefix,
+      TExplainLevel detailLevel) {
+    StringBuilder output = new StringBuilder();
+    output.append(String.format("%s%s [%s]\n", rootPrefix, getDisplayLabel(),
+        getDisplayLabelDetail()));
+    return output.toString();
+  }
+
+  @Override
+  protected void toThrift(TPlanNode msg) {
+    // Implement for fragment execution
+  }
+
+  @Override
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
+    nodeResourceProfile_ = ResourceProfile.noReservation(0);
+  }
+
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = computeDefaultProcessingCost();
+  }
+
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index 59ebb179c..d9b3ac49b 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -310,10 +310,6 @@ abstract public class ScanNode extends PlanNode {
 
   @Override
   protected String getDisplayLabelDetail() {
-    FeTable table = desc_.getTable();
-    List<String> path = new ArrayList<>();
-    path.add(table.getDb().getName());
-    path.add(table.getName());
     Preconditions.checkNotNull(desc_.getPath());
     if (desc_.hasExplicitAlias()) {
       return desc_.getPath().toString() + " " + desc_.getAlias();
@@ -391,7 +387,7 @@ abstract public class ScanNode extends PlanNode {
   /**
    * Estimate per-row cost as 1 per 1KB row size.
    * <p>
-   * This reflect deserialization/copy cost per row.
+   * This reflects deserialization/copy cost per row.
    */
   private float rowMaterializationCost() { return getAvgRowSize() / 1024; }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java 
b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 388d08094..71c7af2e5 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -863,7 +863,6 @@ public class SingleNodePlanner {
             // If the collection is within a (possibly nested) struct, add the 
tuple in
             // which the top level struct is located.
             SlotDescriptor desc = collectionExpr.getDesc();
-            List<TupleDescriptor> enclosingTupleDescs = 
desc.getEnclosingTupleDescs();
             TupleDescriptor topTuple = desc.getTopEnclosingTupleDesc();
             requiredTids.add(topTuple.getId());
           } else {
@@ -1900,6 +1899,14 @@ public class SingleNodePlanner {
     }
   }
 
+  private PlanNode createIcebergMetadataScanNode(TableRef tblRef, Analyzer 
analyzer)
+      throws ImpalaException {
+    IcebergMetadataScanNode icebergMetadataScanNode =
+        new IcebergMetadataScanNode(ctx_.getNextNodeId(), tblRef.getDesc());
+    icebergMetadataScanNode.init(analyzer);
+    return icebergMetadataScanNode;
+  }
+
   /**
    * Returns all applicable conjuncts for join between two plan trees 
'materializing' the
    * given left-hand and right-hand side table ref ids. The conjuncts either 
come from
@@ -2213,9 +2220,7 @@ public class SingleNodePlanner {
       result = new SingularRowSrcNode(ctx_.getNextNodeId(), ctx_.getSubplan());
       result.init(analyzer);
     } else if (tblRef instanceof IcebergMetadataTableRef) {
-      throw new NotImplementedException(String.format("'%s' refers to a 
metadata table "
-          + "which is currently not supported.", String.join(".",
-          tblRef.getPath())));
+      result = createIcebergMetadataScanNode(tblRef, analyzer);
     } else {
       throw new NotImplementedException(
           "Planning not implemented for table ref class: " + 
tblRef.getClass());
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index d51e34897..8829beab9 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -124,6 +124,7 @@ import org.apache.impala.catalog.ImpaladTableUsageTracker;
 import org.apache.impala.catalog.MaterializedViewHdfsTable;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
@@ -290,9 +291,8 @@ public class Frontend {
       protected long estimated_memory_per_host_ = -1;
 
       // The processing cores required to execute the query.
-      // Certain queries such as EXPLAIN that do not populate
-      // TExecRequest.query_exec_request. Therefore, cores requirement will be 
set here
-      // through setCoresRequired().
+      // Certain queries such as EXPLAIN do not populate 
TExecRequest.query_exec_request.
+      // Therefore, cores requirement will be set here through 
setCoresRequired().
       protected int cores_required_ = -1;
 
       // The initial length of content in explain buffer to help return the 
buffer
@@ -2470,6 +2470,14 @@ public class Frontend {
         return result;
       }
 
+      // Blocking query execution for queries that contain 
IcebergMetadataTables
+      for (FeTable table : stmtTableCache.tables.values()) {
+        if (table instanceof IcebergMetadataTable) {
+          throw new NotImplementedException(String.format("'%s' refers to a 
metadata "
+              + "table which is currently not supported.", 
table.getFullName()));
+        }
+      }
+
       result.setQuery_exec_request(queryExecRequest);
       if (analysisResult.isQueryStmt()) {
         result.stmt_type = TStmtType.QUERY;
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 6b4eba927..a3b91eaf5 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1282,6 +1282,15 @@ public class PlannerTest extends PlannerTestBase {
         ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
+  /**
+   * Check that Iceberg metadata table scan plans are as expected.
+   */
+  @Test
+  public void testIcebergMetadataTableScans() {
+    runPlannerTestFile("iceberg-metadata-table-scan", "functional_parquet",
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
+  }
+
   /**
    * Test limit pushdown into analytic sort in isolation.
    */
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test
new file mode 100644
index 000000000..f976cb75e
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test
@@ -0,0 +1,114 @@
+explain SELECT * FROM functional_parquet.iceberg_alltypes_part_orc.history
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history]
+   row-size=33B cardinality=unavailable
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history]
+   row-size=33B cardinality=unavailable
+====
+explain select *
+from functional_parquet.iceberg_alltypes_part_orc.history q
+  join functional_parquet.iceberg_alltypes_part_orc.history z
+  on z.snapshot_id = q.snapshot_id
+---- PLAN
+PLAN-ROOT SINK
+|
+02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=66B cardinality=unavailable
+|
+|--01:SCAN ICEBERG METADATA 
[functional_parquet.iceberg_alltypes_part_orc.history z]
+|     row-size=33B cardinality=unavailable
+|
+00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history 
q]
+   row-size=33B cardinality=unavailable
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=66B cardinality=unavailable
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN ICEBERG METADATA 
[functional_parquet.iceberg_alltypes_part_orc.history z]
+|     row-size=33B cardinality=unavailable
+|
+00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history 
q]
+   row-size=33B cardinality=unavailable
+====
+explain select q.snapshot_id, z.made_current_at as test1, z.made_current_at as 
test2
+from functional_parquet.iceberg_alltypes_part_orc.history q
+  join functional_parquet.iceberg_alltypes_part_orc.history z
+  on z.snapshot_id = q.snapshot_id
+---- PLAN
+PLAN-ROOT SINK
+|
+02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=24B cardinality=unavailable
+|
+|--01:SCAN ICEBERG METADATA 
[functional_parquet.iceberg_alltypes_part_orc.history z]
+|     row-size=16B cardinality=unavailable
+|
+00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history 
q]
+   row-size=8B cardinality=unavailable
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
+|  row-size=24B cardinality=unavailable
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN ICEBERG METADATA 
[functional_parquet.iceberg_alltypes_part_orc.history z]
+|     row-size=16B cardinality=unavailable
+|
+00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history 
q]
+   row-size=8B cardinality=unavailable
+====
+explain select * from functional_parquet.iceberg_alltypes_part_orc.manifests 
a, a.partition_summaries
+---- PLAN
+PLAN-ROOT SINK
+|
+01:SUBPLAN
+|  row-size=98B cardinality=unavailable
+|
+|--04:NESTED LOOP JOIN [CROSS JOIN]
+|  |  row-size=98B cardinality=10
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     row-size=72B cardinality=1
+|  |
+|  03:UNNEST [a.partition_summaries]
+|     row-size=0B cardinality=10
+|
+00:SCAN ICEBERG METADATA 
[functional_parquet.iceberg_alltypes_part_orc.manifests a]
+   row-size=72B cardinality=unavailable
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+01:SUBPLAN
+|  row-size=98B cardinality=unavailable
+|
+|--04:NESTED LOOP JOIN [CROSS JOIN]
+|  |  row-size=98B cardinality=10
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     row-size=72B cardinality=1
+|  |
+|  03:UNNEST [a.partition_summaries]
+|     row-size=0B cardinality=10
+|
+00:SCAN ICEBERG METADATA 
[functional_parquet.iceberg_alltypes_part_orc.manifests a]
+   row-size=72B cardinality=unavailable
+====
\ No newline at end of file

Reply via email to