This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 048b5689fd6adbe7f7e7fda0f04bd95c6f510a45
Author: Steve Carlin <[email protected]>
AuthorDate: Fri Aug 22 10:50:13 2025 -0700

    IMPALA-14080: Support LocalFsTable table types in Calcite planner.
    
    IMPALA-13947 changes the use_local_catalog default to true. This causes
    failure for when the use_calcite_planner query option is set to true.
    
    The Calcite planner was only handling HdfsTable table types. It will
    now handle LocalFsTable table types as well.
    
    Currently, if table num rows is missing from table, Calcite planner will
    load all partitions to estimate by iterating all partitions. This is
    inefficent in local catalog mode and ideally should happen later after
    partition prunning. Follow up work is needed to improve this.
    
    Testing:
    Reenable local catalog mode in
    TestCalcitePlanner.test_calcite_frontend
    TestWorkloadManagementSQLDetailsCalcite.test_tpcds_8_decimal
    
    Co-authored-by: Riza Suminto
    
    Change-Id: Ic855779aa64d11b7a8b19dd261c0164e65604e44
    Reviewed-on: http://gerrit.cloudera.org:8080/23341
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/calcite/rel/node/ImpalaHdfsScanRel.java | 13 +++----
 .../apache/impala/calcite/schema/CalciteDb.java    | 40 ++++++++++---------
 .../apache/impala/calcite/schema/CalciteTable.java | 45 +++++++++++-----------
 .../impala/calcite/service/ExecRequestCreator.java | 14 +++----
 tests/custom_cluster/test_calcite_planner.py       |  5 +--
 .../test_workload_mgmt_sql_details.py              |  2 -
 6 files changed, 57 insertions(+), 62 deletions(-)

diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java
index eaa2532c9..a9b239f95 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java
@@ -27,8 +27,8 @@ import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BaseTableRef;
 import org.apache.impala.analysis.Expr;
-import org.apache.impala.analysis.Path;
 import org.apache.impala.analysis.ExprSubstitutionMap;
+import org.apache.impala.analysis.Path;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.TupleDescriptor;
@@ -39,22 +39,21 @@ import org.apache.impala.calcite.schema.CalciteTable;
 import org.apache.impala.calcite.util.SimplifiedAnalyzer;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsPartition;
-import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.UnsupportedFeatureException;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.planner.PlanNodeId;
-import org.apache.impala.planner.SingleNodePlanner;
 import org.apache.impala.planner.ScanNode;
+import org.apache.impala.planner.SingleNodePlanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 /**
  * ImpalaHdfsScanRel. Calcite RelNode which maps to an Impala TableScan node.
  */
@@ -138,7 +137,7 @@ public class ImpalaHdfsScanRel extends TableScan
   private List<Expr> createScanOutputExprs(List<SlotDescriptor> slotDescs)
       throws ImpalaException {
     CalciteTable calciteTable = (CalciteTable) getTable();
-    HdfsTable table = calciteTable.getHdfsTable();
+    FeFsTable table = calciteTable.getFeFsTable();
     // IMPALA-12961: The output expressions are contained in a list which
     // may have holes in it (if the table scan column is not in the output).
     // The width of the list must include all columns, including the acid ones,
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java
index 9f327352f..3b44036ff 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java
@@ -17,24 +17,25 @@
 
 package org.apache.impala.calcite.schema;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.CalciteCatalogReader;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeImpl;
 import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.ViewTable;
 import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.ViewTable;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.calcite.type.ImpalaTypeSystemImpl;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.FeView;
 import org.apache.impala.catalog.HdfsTable;
-import org.apache.impala.catalog.View;
+import org.apache.impala.catalog.local.LocalFsTable;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.UnsupportedFeatureException;
 
-import com.google.common.collect.ImmutableList;
-
 import java.lang.reflect.Type;
 import java.util.HashMap;
 import java.util.Map;
@@ -65,32 +66,33 @@ public class CalciteDb extends AbstractSchema {
         Analyzer analyzer) throws ImpalaException {
       if (tableMap_.containsKey(tableName)) return this;
 
-      if (table instanceof HdfsTable) {
-          tableMap_.put(tableName.toLowerCase(),
-              new CalciteTable(table, reader_, analyzer));
-          return this;
+      if (table instanceof LocalFsTable || table instanceof HdfsTable) {
+        tableMap_.put(
+            tableName.toLowerCase(), new CalciteTable(table, reader_, 
analyzer));
+        return this;
       }
 
-      if (table instanceof View) {
-          tableMap_.put(tableName.toLowerCase(), createViewTable(table));
-          return this;
+      if (table instanceof FeView) {
+        tableMap_.put(tableName.toLowerCase(), createViewTable(table));
+        return this;
       }
 
-      throw new UnsupportedFeatureException(
-          "Table " + table.getFullName() + " has unsupported type " +
-              table.getClass().getSimpleName() + ". The Calcite planner only 
supports " +
-              "HdfsTable's and View's.");
+      throw new UnsupportedFeatureException("Table " + table.getFullName()
+          + " has unsupported type " + table.getClass().getSimpleName()
+          + ". The Calcite planner only supports "
+          + "HdfsTable's and FeView's.");
     }
 
     private static ViewTable createViewTable(FeTable feTable) throws 
ImpalaException {
       RelDataType rowType = CalciteTable.buildColumnsForRelDataType(feTable);
       JavaTypeFactory typeFactory = (JavaTypeFactory) 
ImpalaTypeSystemImpl.TYPE_FACTORY;
       Type elementType = typeFactory.getJavaClass(rowType);
-      return new ViewTable(elementType,
-          RelDataTypeImpl.proto(rowType), ((View) 
feTable).getQueryStmt().toSql(),
+      return new ViewTable(elementType, RelDataTypeImpl.proto(rowType),
+          ((FeView) feTable).getQueryStmt().toSql(),
           /* schemaPath */ ImmutableList.of(),
-          /* viewPath */ 
ImmutableList.of(feTable.getDb().getName().toLowerCase(),
-          feTable.getName().toLowerCase()));
+          /* viewPath */
+          ImmutableList.of(
+              feTable.getDb().getName().toLowerCase(), 
feTable.getName().toLowerCase()));
     }
 
     public CalciteDb build() {
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java
index 2ed7caaf3..56f1e1f7e 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java
@@ -17,6 +17,9 @@
 
 package org.apache.impala.calcite.schema;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.plan.RelOptAbstractTable;
@@ -47,39 +50,36 @@ import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.TableRef;
 import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.calcite.rel.util.ImpalaBaseTableRef;
+import org.apache.impala.calcite.type.ImpalaTypeConverter;
+import org.apache.impala.calcite.type.ImpalaTypeSystemImpl;
+import org.apache.impala.calcite.util.SimplifiedAnalyzer;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsPartition;
+import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.IcebergTable;
-import org.apache.impala.calcite.rel.util.ImpalaBaseTableRef;
-import org.apache.impala.calcite.type.ImpalaTypeConverter;
-import org.apache.impala.calcite.type.ImpalaTypeSystemImpl;
-import org.apache.impala.calcite.util.SimplifiedAnalyzer;
-import org.apache.impala.planner.HdfsEstimatedMissingTableStats;
-import org.apache.impala.planner.HdfsPartitionPruner;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.UnsupportedFeatureException;
+import org.apache.impala.planner.HdfsEstimatedMissingTableStats;
+import org.apache.impala.planner.HdfsPartitionPruner;
 import org.apache.impala.util.AcidUtils;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-
 public class CalciteTable extends RelOptAbstractTable
     implements Table, Prepare.PreparingTable {
-
-  private final HdfsTable table_;
+  private final FeFsTable table_;
 
   private final Map<Integer, Integer> impalaPositionMap_;
 
@@ -94,15 +94,18 @@ public class CalciteTable extends RelOptAbstractTable
   public CalciteTable(FeTable table, CalciteCatalogReader reader,
       Analyzer analyzer) throws ImpalaException {
     super(reader, table.getName(), buildColumnsForRelDataType(table));
-    this.table_ = (HdfsTable) table;
+    this.table_ = (FeFsTable) table;
     this.qualifiedTableName_ = table.getTableName().toPath();
     this.columns_ = table.getColumnsInHiveOrder();
     this.impalaPositionMap_ = buildPositionMap();
     this.analyzer_ = (SimplifiedAnalyzer) analyzer;
-    estimatedMissingStats_ = table_.getNumRows() < 0
-        ? new HdfsEstimatedMissingTableStats(analyzer.getQueryOptions(), 
table_,
-            table_.getPartitions(), -1)
-        : null;
+    // TODO: If table_.getNumRows() is unknown (-1), this logic will load all 
partitions
+    // to compute estimation using HdfsEstimatedMissingTableStats. This is 
potentially
+    // expensive and should be avoided in local catalog mode.
+    estimatedMissingStats_ = table_.getNumRows() < 0 ?
+        new HdfsEstimatedMissingTableStats(
+            analyzer.getQueryOptions(), table_, table_.loadAllPartitions(), 
-1) :
+        null;
 
     checkIfTableIsSupported(table);
   }
@@ -131,11 +134,10 @@ public class CalciteTable extends RelOptAbstractTable
       throw new UnsupportedFeatureException("Views are not supported yet.");
     }
 
-    if (!(table instanceof HdfsTable)) {
+    if (!(table instanceof FeFsTable)) {
       String tableType = table.getClass().getSimpleName().replace("Table", "");
       throw new UnsupportedFeatureException(tableType + " tables are not 
supported yet.");
     }
-
   }
 
   public BaseTableRef createBaseTableRef(SimplifiedAnalyzer analyzer
@@ -165,9 +167,7 @@ public class CalciteTable extends RelOptAbstractTable
     return impalaPair.first;
   }
 
-  public HdfsTable getHdfsTable() {
-    return table_;
-  }
+  public FeFsTable getFeFsTable() { return table_; }
 
   @Override
   public List<String> getQualifiedName() {
@@ -247,7 +247,6 @@ public class CalciteTable extends RelOptAbstractTable
   }
 
   public Column getColumn(int i) {
-    HdfsTable feFsTable = (HdfsTable) table_;
     return columns_.get(i);
   }
 
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java
index ef299a649..b0dd1ec29 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java
@@ -17,12 +17,14 @@
 
 package org.apache.impala.calcite.service;
 
-import org.apache.impala.calcite.rel.node.NodeWithExprs;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.JoinOperator;
+import org.apache.impala.calcite.rel.node.NodeWithExprs;
+import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.common.ImpalaException;
@@ -33,9 +35,9 @@ import org.apache.impala.planner.NestedLoopJoinNode;
 import org.apache.impala.planner.ParallelPlanner;
 import org.apache.impala.planner.PlanFragment;
 import org.apache.impala.planner.PlanNode;
+import org.apache.impala.planner.PlanRootSink;
 import org.apache.impala.planner.Planner;
 import org.apache.impala.planner.PlannerContext;
-import org.apache.impala.planner.PlanRootSink;
 import org.apache.impala.planner.RuntimeFilterGenerator;
 import org.apache.impala.planner.SingleNodePlanner;
 import org.apache.impala.planner.SingularRowSrcNode;
@@ -55,19 +57,17 @@ import org.apache.impala.thrift.TRuntimeFilterMode;
 import org.apache.impala.thrift.TRuntimeProfileNode;
 import org.apache.impala.thrift.TStmtType;
 import org.apache.impala.util.EventSequence;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-
 /**
  * ExecRequestCreator. Responsible for taking a PlanNode and the output Expr 
list
  *  from the top level PlanNode and convert it into a TExecRequest thrift 
object
@@ -329,8 +329,8 @@ public class ExecRequestCreator implements CompilerStep {
   private List<TNetworkAddress> getHostLocations(Collection<FeTable> tables) {
     Set<TNetworkAddress> hostLocations = new HashSet<>();
     for (FeTable table : tables) {
-      if (table instanceof HdfsTable) {
-        hostLocations.addAll(((HdfsTable) table).getHostIndex().getList());
+      if (table instanceof FeFsTable) {
+        hostLocations.addAll(((FeFsTable) table).getHostIndex().getList());
       }
     }
     return new ArrayList<>(hostLocations);
diff --git a/tests/custom_cluster/test_calcite_planner.py 
b/tests/custom_cluster/test_calcite_planner.py
index 4288d20dc..97b2ead67 100644
--- a/tests/custom_cluster/test_calcite_planner.py
+++ b/tests/custom_cluster/test_calcite_planner.py
@@ -37,10 +37,7 @@ class TestCalcitePlanner(CustomClusterTestSuite):
     add_mandatory_exec_option(cls, 'use_calcite_planner', 'true')
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-      start_args="--env_vars=USE_CALCITE_PLANNER=true",
-      impalad_args="--use_local_catalog=false",
-      catalogd_args="--catalog_topic_mode=full")
+  
@CustomClusterTestSuite.with_args(start_args="--env_vars=USE_CALCITE_PLANNER=true")
   def test_calcite_frontend(self, vector, unique_database):
     """Calcite planner does not work in local catalog mode yet."""
     self.run_test_case('QueryTest/calcite', vector, use_db=unique_database)
diff --git a/tests/custom_cluster/test_workload_mgmt_sql_details.py 
b/tests/custom_cluster/test_workload_mgmt_sql_details.py
index 5d9d2a882..8134e896d 100644
--- a/tests/custom_cluster/test_workload_mgmt_sql_details.py
+++ b/tests/custom_cluster/test_workload_mgmt_sql_details.py
@@ -429,8 +429,6 @@ class 
TestWorkloadManagementSQLDetailsCalcite(WorkloadManagementTestSuite):
 
   @CustomClusterTestSuite.with_args(
       start_args="--use_calcite_planner=true",
-      impalad_args="--use_local_catalog=false",
-      catalogd_args="--catalog_topic_mode=full",
       cluster_size=1, workload_mgmt=True)
   def test_tpcds_8_decimal(self, vector):
     """Runs the tpcds-decimal_v2-q8 query using the calcite planner and 
asserts the query

Reply via email to