This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 048b5689fd6adbe7f7e7fda0f04bd95c6f510a45 Author: Steve Carlin <[email protected]> AuthorDate: Fri Aug 22 10:50:13 2025 -0700 IMPALA-14080: Support LocalFsTable table types in Calcite planner. IMPALA-13947 changes the use_local_catalog default to true. This causes failure for when the use_calcite_planner query option is set to true. The Calcite planner was only handling HdfsTable table types. It will now handle LocalFsTable table types as well. Currently, if table num rows is missing from table, Calcite planner will load all partitions to estimate by iterating all partitions. This is inefficent in local catalog mode and ideally should happen later after partition prunning. Follow up work is needed to improve this. Testing: Reenable local catalog mode in TestCalcitePlanner.test_calcite_frontend TestWorkloadManagementSQLDetailsCalcite.test_tpcds_8_decimal Co-authored-by: Riza Suminto Change-Id: Ic855779aa64d11b7a8b19dd261c0164e65604e44 Reviewed-on: http://gerrit.cloudera.org:8080/23341 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/calcite/rel/node/ImpalaHdfsScanRel.java | 13 +++---- .../apache/impala/calcite/schema/CalciteDb.java | 40 ++++++++++--------- .../apache/impala/calcite/schema/CalciteTable.java | 45 +++++++++++----------- .../impala/calcite/service/ExecRequestCreator.java | 14 +++---- tests/custom_cluster/test_calcite_planner.py | 5 +-- .../test_workload_mgmt_sql_details.py | 2 - 6 files changed, 57 insertions(+), 62 deletions(-) diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java index eaa2532c9..a9b239f95 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java @@ -27,8 +27,8 @@ import org.apache.calcite.util.ImmutableBitSet; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.BaseTableRef; import org.apache.impala.analysis.Expr; -import org.apache.impala.analysis.Path; import org.apache.impala.analysis.ExprSubstitutionMap; +import org.apache.impala.analysis.Path; import org.apache.impala.analysis.SlotDescriptor; import org.apache.impala.analysis.SlotRef; import org.apache.impala.analysis.TupleDescriptor; @@ -39,22 +39,21 @@ import org.apache.impala.calcite.schema.CalciteTable; import org.apache.impala.calcite.util.SimplifiedAnalyzer; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.FeFsPartition; -import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.catalog.FeFsTable; import org.apache.impala.catalog.Type; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.UnsupportedFeatureException; import org.apache.impala.planner.PlanNode; import org.apache.impala.planner.PlanNodeId; -import org.apache.impala.planner.SingleNodePlanner; import org.apache.impala.planner.ScanNode; +import org.apache.impala.planner.SingleNodePlanner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * ImpalaHdfsScanRel. Calcite RelNode which maps to an Impala TableScan node. */ @@ -138,7 +137,7 @@ public class ImpalaHdfsScanRel extends TableScan private List<Expr> createScanOutputExprs(List<SlotDescriptor> slotDescs) throws ImpalaException { CalciteTable calciteTable = (CalciteTable) getTable(); - HdfsTable table = calciteTable.getHdfsTable(); + FeFsTable table = calciteTable.getFeFsTable(); // IMPALA-12961: The output expressions are contained in a list which // may have holes in it (if the table scan column is not in the output). // The width of the list must include all columns, including the acid ones, diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java index 9f327352f..3b44036ff 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java @@ -17,24 +17,25 @@ package org.apache.impala.calcite.schema; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; + import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.prepare.CalciteCatalogReader; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeImpl; import org.apache.calcite.schema.Table; -import org.apache.calcite.schema.impl.ViewTable; import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.schema.impl.ViewTable; import org.apache.impala.analysis.Analyzer; import org.apache.impala.calcite.type.ImpalaTypeSystemImpl; import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.FeView; import org.apache.impala.catalog.HdfsTable; -import org.apache.impala.catalog.View; +import org.apache.impala.catalog.local.LocalFsTable; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.UnsupportedFeatureException; -import com.google.common.collect.ImmutableList; - import java.lang.reflect.Type; import java.util.HashMap; import java.util.Map; @@ -65,32 +66,33 @@ public class CalciteDb extends AbstractSchema { Analyzer analyzer) throws ImpalaException { if (tableMap_.containsKey(tableName)) return this; - if (table instanceof HdfsTable) { - tableMap_.put(tableName.toLowerCase(), - new CalciteTable(table, reader_, analyzer)); - return this; + if (table instanceof LocalFsTable || table instanceof HdfsTable) { + tableMap_.put( + tableName.toLowerCase(), new CalciteTable(table, reader_, analyzer)); + return this; } - if (table instanceof View) { - tableMap_.put(tableName.toLowerCase(), createViewTable(table)); - return this; + if (table instanceof FeView) { + tableMap_.put(tableName.toLowerCase(), createViewTable(table)); + return this; } - throw new UnsupportedFeatureException( - "Table " + table.getFullName() + " has unsupported type " + - table.getClass().getSimpleName() + ". The Calcite planner only supports " + - "HdfsTable's and View's."); + throw new UnsupportedFeatureException("Table " + table.getFullName() + + " has unsupported type " + table.getClass().getSimpleName() + + ". The Calcite planner only supports " + + "HdfsTable's and FeView's."); } private static ViewTable createViewTable(FeTable feTable) throws ImpalaException { RelDataType rowType = CalciteTable.buildColumnsForRelDataType(feTable); JavaTypeFactory typeFactory = (JavaTypeFactory) ImpalaTypeSystemImpl.TYPE_FACTORY; Type elementType = typeFactory.getJavaClass(rowType); - return new ViewTable(elementType, - RelDataTypeImpl.proto(rowType), ((View) feTable).getQueryStmt().toSql(), + return new ViewTable(elementType, RelDataTypeImpl.proto(rowType), + ((FeView) feTable).getQueryStmt().toSql(), /* schemaPath */ ImmutableList.of(), - /* viewPath */ ImmutableList.of(feTable.getDb().getName().toLowerCase(), - feTable.getName().toLowerCase())); + /* viewPath */ + ImmutableList.of( + feTable.getDb().getName().toLowerCase(), feTable.getName().toLowerCase())); } public CalciteDb build() { diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java index 2ed7caaf3..56f1e1f7e 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java @@ -17,6 +17,9 @@ package org.apache.impala.calcite.schema; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + import org.apache.calcite.config.CalciteConnectionConfig; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.plan.RelOptAbstractTable; @@ -47,39 +50,36 @@ import org.apache.impala.analysis.SlotDescriptor; import org.apache.impala.analysis.SlotRef; import org.apache.impala.analysis.TableRef; import org.apache.impala.analysis.TupleDescriptor; +import org.apache.impala.calcite.rel.util.ImpalaBaseTableRef; +import org.apache.impala.calcite.type.ImpalaTypeConverter; +import org.apache.impala.calcite.type.ImpalaTypeSystemImpl; +import org.apache.impala.calcite.util.SimplifiedAnalyzer; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.FeFsPartition; +import org.apache.impala.catalog.FeFsTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.FeView; import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.IcebergTable; -import org.apache.impala.calcite.rel.util.ImpalaBaseTableRef; -import org.apache.impala.calcite.type.ImpalaTypeConverter; -import org.apache.impala.calcite.type.ImpalaTypeSystemImpl; -import org.apache.impala.calcite.util.SimplifiedAnalyzer; -import org.apache.impala.planner.HdfsEstimatedMissingTableStats; -import org.apache.impala.planner.HdfsPartitionPruner; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.Pair; import org.apache.impala.common.UnsupportedFeatureException; +import org.apache.impala.planner.HdfsEstimatedMissingTableStats; +import org.apache.impala.planner.HdfsPartitionPruner; import org.apache.impala.util.AcidUtils; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; - import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; - public class CalciteTable extends RelOptAbstractTable implements Table, Prepare.PreparingTable { - - private final HdfsTable table_; + private final FeFsTable table_; private final Map<Integer, Integer> impalaPositionMap_; @@ -94,15 +94,18 @@ public class CalciteTable extends RelOptAbstractTable public CalciteTable(FeTable table, CalciteCatalogReader reader, Analyzer analyzer) throws ImpalaException { super(reader, table.getName(), buildColumnsForRelDataType(table)); - this.table_ = (HdfsTable) table; + this.table_ = (FeFsTable) table; this.qualifiedTableName_ = table.getTableName().toPath(); this.columns_ = table.getColumnsInHiveOrder(); this.impalaPositionMap_ = buildPositionMap(); this.analyzer_ = (SimplifiedAnalyzer) analyzer; - estimatedMissingStats_ = table_.getNumRows() < 0 - ? new HdfsEstimatedMissingTableStats(analyzer.getQueryOptions(), table_, - table_.getPartitions(), -1) - : null; + // TODO: If table_.getNumRows() is unknown (-1), this logic will load all partitions + // to compute estimation using HdfsEstimatedMissingTableStats. This is potentially + // expensive and should be avoided in local catalog mode. + estimatedMissingStats_ = table_.getNumRows() < 0 ? + new HdfsEstimatedMissingTableStats( + analyzer.getQueryOptions(), table_, table_.loadAllPartitions(), -1) : + null; checkIfTableIsSupported(table); } @@ -131,11 +134,10 @@ public class CalciteTable extends RelOptAbstractTable throw new UnsupportedFeatureException("Views are not supported yet."); } - if (!(table instanceof HdfsTable)) { + if (!(table instanceof FeFsTable)) { String tableType = table.getClass().getSimpleName().replace("Table", ""); throw new UnsupportedFeatureException(tableType + " tables are not supported yet."); } - } public BaseTableRef createBaseTableRef(SimplifiedAnalyzer analyzer @@ -165,9 +167,7 @@ public class CalciteTable extends RelOptAbstractTable return impalaPair.first; } - public HdfsTable getHdfsTable() { - return table_; - } + public FeFsTable getFeFsTable() { return table_; } @Override public List<String> getQualifiedName() { @@ -247,7 +247,6 @@ public class CalciteTable extends RelOptAbstractTable } public Column getColumn(int i) { - HdfsTable feFsTable = (HdfsTable) table_; return columns_.get(i); } diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java index ef299a649..b0dd1ec29 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java @@ -17,12 +17,14 @@ package org.apache.impala.calcite.service; -import org.apache.impala.calcite.rel.node.NodeWithExprs; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; + import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.JoinOperator; +import org.apache.impala.calcite.rel.node.NodeWithExprs; +import org.apache.impala.catalog.FeFsTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.common.ImpalaException; @@ -33,9 +35,9 @@ import org.apache.impala.planner.NestedLoopJoinNode; import org.apache.impala.planner.ParallelPlanner; import org.apache.impala.planner.PlanFragment; import org.apache.impala.planner.PlanNode; +import org.apache.impala.planner.PlanRootSink; import org.apache.impala.planner.Planner; import org.apache.impala.planner.PlannerContext; -import org.apache.impala.planner.PlanRootSink; import org.apache.impala.planner.RuntimeFilterGenerator; import org.apache.impala.planner.SingleNodePlanner; import org.apache.impala.planner.SingularRowSrcNode; @@ -55,19 +57,17 @@ import org.apache.impala.thrift.TRuntimeFilterMode; import org.apache.impala.thrift.TRuntimeProfileNode; import org.apache.impala.thrift.TStmtType; import org.apache.impala.util.EventSequence; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; - /** * ExecRequestCreator. Responsible for taking a PlanNode and the output Expr list * from the top level PlanNode and convert it into a TExecRequest thrift object @@ -329,8 +329,8 @@ public class ExecRequestCreator implements CompilerStep { private List<TNetworkAddress> getHostLocations(Collection<FeTable> tables) { Set<TNetworkAddress> hostLocations = new HashSet<>(); for (FeTable table : tables) { - if (table instanceof HdfsTable) { - hostLocations.addAll(((HdfsTable) table).getHostIndex().getList()); + if (table instanceof FeFsTable) { + hostLocations.addAll(((FeFsTable) table).getHostIndex().getList()); } } return new ArrayList<>(hostLocations); diff --git a/tests/custom_cluster/test_calcite_planner.py b/tests/custom_cluster/test_calcite_planner.py index 4288d20dc..97b2ead67 100644 --- a/tests/custom_cluster/test_calcite_planner.py +++ b/tests/custom_cluster/test_calcite_planner.py @@ -37,10 +37,7 @@ class TestCalcitePlanner(CustomClusterTestSuite): add_mandatory_exec_option(cls, 'use_calcite_planner', 'true') @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - start_args="--env_vars=USE_CALCITE_PLANNER=true", - impalad_args="--use_local_catalog=false", - catalogd_args="--catalog_topic_mode=full") + @CustomClusterTestSuite.with_args(start_args="--env_vars=USE_CALCITE_PLANNER=true") def test_calcite_frontend(self, vector, unique_database): """Calcite planner does not work in local catalog mode yet.""" self.run_test_case('QueryTest/calcite', vector, use_db=unique_database) diff --git a/tests/custom_cluster/test_workload_mgmt_sql_details.py b/tests/custom_cluster/test_workload_mgmt_sql_details.py index 5d9d2a882..8134e896d 100644 --- a/tests/custom_cluster/test_workload_mgmt_sql_details.py +++ b/tests/custom_cluster/test_workload_mgmt_sql_details.py @@ -429,8 +429,6 @@ class TestWorkloadManagementSQLDetailsCalcite(WorkloadManagementTestSuite): @CustomClusterTestSuite.with_args( start_args="--use_calcite_planner=true", - impalad_args="--use_local_catalog=false", - catalogd_args="--catalog_topic_mode=full", cluster_size=1, workload_mgmt=True) def test_tpcds_8_decimal(self, vector): """Runs the tpcds-decimal_v2-q8 query using the calcite planner and asserts the query
