This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 0b3bc412f2b09f4818cb0c50fccec5cfb3ddfe8d Author: Noemi Pap-Takacs <[email protected]> AuthorDate: Wed Mar 5 16:45:19 2025 +0100 IMPALA-13738 (Part1): Refactor IcebergTable.getPartialInfo() This patch aims to refactor Iceberg tables in the Catalog. It reduces the dependency on the internal hdfsTable_ object in IcebergTable.getPartialInfo(). Now we depend less on HdfsTable when we load an Iceberg table in local catalog mode. The remaining dependencies will be addressed in following patches. Now creating TPartialPartitionInfo is the responsibility of the partitions, not the tables: - FeFsPartition holds a basic default implementation for the unpartitioned tables, which includes all Iceberg tables (we handle Iceberg tables similarly to unpartitioned HDFS tables in the Catalog). - HdfsPartition extends this implementation with HDFS and Hive ACID specific fields. Testing: - existing Iceberg e2e tests in local catalog mode - existing Hive ACID tests in local catalog mode - ran test_iceberg_with_puffin.py in local catalog mode - LocalCatalog FE test Change-Id: I75d277fb555d0088a2e28111ff529a4605d734fa Reviewed-on: http://gerrit.cloudera.org:8080/22605 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/catalog/FeFsPartition.java | 42 +++++++++++ .../java/org/apache/impala/catalog/FeFsTable.java | 7 ++ .../org/apache/impala/catalog/HdfsPartition.java | 81 ++++++++++++++++++---- .../java/org/apache/impala/catalog/HdfsTable.java | 75 ++++---------------- .../org/apache/impala/catalog/IcebergTable.java | 73 +++++++++++++++---- .../apache/impala/catalog/local/LocalFsTable.java | 5 ++ 6 files changed, 194 insertions(+), 89 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java index bcec2a634..40fba33c1 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java @@ -17,6 +17,7 @@ package org.apache.impala.catalog; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -28,8 +29,10 @@ import org.apache.hadoop.fs.Path; import org.apache.impala.analysis.LiteralExpr; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.thrift.TAccessLevel; +import org.apache.impala.thrift.TGetPartialCatalogObjectRequest; import org.apache.impala.thrift.THdfsPartitionLocation; import org.apache.impala.thrift.TNetworkAddress; +import org.apache.impala.thrift.TPartialPartitionInfo; import org.apache.impala.thrift.TPartitionStats; import org.apache.impala.util.ListMap; @@ -209,4 +212,43 @@ public interface FeFsPartition { * Returns new FeFsPartition that has the delete delta descriptors as file descriptors. */ FeFsPartition genDeleteDeltaPartition(); + + /** + * Creates TPartialPartitionInfo for the default partition (the one and only partition + * that unpartitioned tables have). + */ + default TPartialPartitionInfo getDefaultPartialPartitionInfo( + TGetPartialCatalogObjectRequest req) { + + TPartialPartitionInfo partInfo = new TPartialPartitionInfo(getId()); + + if (req.table_info_selector.want_partition_names) { + partInfo.setName(getPartitionName()); + } + + if (req.table_info_selector.want_partition_metadata) { + // We do not need to set partition metadata for unpartitioned tables. + partInfo.setHas_incremental_stats(hasIncrementalStats()); + } + + if (req.table_info_selector.want_partition_files) { + partInfo.setLast_compaction_id(-1); + partInfo.insert_file_descriptors = new ArrayList<>(); + partInfo.delete_file_descriptors = new ArrayList<>(); + partInfo.file_descriptors = new ArrayList<>(); + if (!getTable().isHiveAcid()) { + for (FileDescriptor fd: getFileDescriptors()) { + partInfo.file_descriptors.add(fd.toThrift()); + } + } + } + + if (req.table_info_selector.want_partition_stats) { + partInfo.setPartition_stats(getPartitionStatsCompressed()); + } + + partInfo.setIs_marked_cached(isMarkedCached()); + + return partInfo; + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java index b4d464b4f..5d66c8e74 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java @@ -277,6 +277,13 @@ public interface FeFsTable extends FeTable { */ SqlConstraints getSqlConstraints(); + /** + * @return whether it is a Hive ACID table. + */ + default boolean isHiveAcid() { + return false; + } + default FileSystem getFileSystem() throws CatalogException { FileSystem tableFs; try { diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index c3eef49fe..7cece6268 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -40,6 +40,7 @@ import javax.annotation.Nullable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.LiteralExpr; @@ -58,12 +59,14 @@ import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TExpr; import org.apache.impala.thrift.TExprNode; +import org.apache.impala.thrift.TGetPartialCatalogObjectRequest; import org.apache.impala.thrift.THdfsFileDesc; import org.apache.impala.thrift.THdfsPartition; import org.apache.impala.thrift.THdfsPartitionLocation; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TPartialPartitionInfo; import org.apache.impala.thrift.TPartitionStats; +import org.apache.impala.util.AcidUtils; import org.apache.impala.util.HdfsCachingUtil; import org.apache.impala.util.ListMap; import org.slf4j.Logger; @@ -592,6 +595,70 @@ public class HdfsPartition extends CatalogObjectImpl encodedDeleteFileDescriptors_.size(); } + /** + * Returns the requested partitions info and the number of files filtered out based on + * the ACID writeIdList. + * For non-ACID tables this number is always 0. This number being null indicates that + * the file descriptors are missing from the Catalog. + */ + public Pair<TPartialPartitionInfo, Integer> getPartialPartitionInfo( + TGetPartialCatalogObjectRequest req, ValidWriteIdList reqWriteIdList) { + + TPartialPartitionInfo partInfo = + FeFsPartition.super.getDefaultPartialPartitionInfo(req); + // Add HdfsPartition specific information to the basic default partition info. + if (req.table_info_selector.want_hms_partition) { + partInfo.hms_partition = toHmsPartition(); + } + // The special "prototype partition" or the only partition of an unpartitioned table + // don't have partition metadata. + if (req.table_info_selector.want_partition_metadata + && table_.isPartitioned() + && id_ != CatalogObjectsConstants.PROTOTYPE_PARTITION_ID) { + // Don't need to make a copy here since the HMS parameters shouldn't change during + // the invocation of getPartialPartitionInfo(). + partInfo.hms_parameters = getParameters(); + partInfo.write_id = writeId_; + partInfo.hdfs_storage_descriptor = fileFormatDescriptor_.toThrift(); + partInfo.location = getLocationAsThrift(); + } + int numFilesFiltered = 0; + if (req.table_info_selector.want_partition_files) { + partInfo.setLast_compaction_id(getLastCompactionId()); + if (table_.isHiveAcid()) { + try { + if (!getInsertFileDescriptors().isEmpty()) { + numFilesFiltered += addFilteredFds(getInsertFileDescriptors(), + partInfo.insert_file_descriptors, reqWriteIdList); + numFilesFiltered += addFilteredFds(getDeleteFileDescriptors(), + partInfo.delete_file_descriptors, reqWriteIdList); + } else { + numFilesFiltered += addFilteredFds(getFileDescriptors(), + partInfo.file_descriptors, reqWriteIdList); + } + } catch (CatalogException ex) { + if (LOG.isDebugEnabled()) { + LOG.debug("Could not use cached file descriptors of partition {} of table {}" + + " for writeIdList {}", getPartitionName(), getTable().getFullName(), + reqWriteIdList, ex); + } + return new Pair<>(partInfo, null); + } + } + } + return new Pair<>(partInfo, numFilesFiltered); + } + + private int addFilteredFds(List<FileDescriptor> fds, List<THdfsFileDesc> thriftFds, + ValidWriteIdList writeIdList) throws CatalogException { + List<FileDescriptor> filteredFds = new ArrayList<>(fds); + int numFilesFiltered = AcidUtils.filterFdsForAcidState(filteredFds, writeIdList); + for (FileDescriptor fd: filteredFds) { + thriftFds.add(fd.toThrift()); + } + return numFilesFiltered; + } + public FileMetadataStats getFileMetadataStats() { return fileMetadataStats_; } @@ -606,20 +673,6 @@ public class HdfsPartition extends CatalogObjectImpl return cachedMsPartitionDescriptor_; } - public void setPartitionMetadata(TPartialPartitionInfo tPart) { - // The special "prototype partition" or the only partition of an unpartitioned table - // don't have partition metadata. - if (id_ == CatalogObjectsConstants.PROTOTYPE_PARTITION_ID - || !table_.isPartitioned()) { - return; - } - // Don't need to make a copy here since the caller should not modify the parameters. - tPart.hms_parameters = getParameters(); - tPart.write_id = writeId_; - tPart.hdfs_storage_descriptor = fileFormatDescriptor_.toThrift(); - tPart.location = getLocationAsThrift(); - } - /** * Returns a Hive-compatible partition object that may be used in calls to the * metastore. diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 020df5e1d..00994419b 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -75,7 +75,6 @@ import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TColumn; import org.apache.impala.thrift.TGetPartialCatalogObjectRequest; import org.apache.impala.thrift.TGetPartialCatalogObjectResponse; -import org.apache.impala.thrift.THdfsFileDesc; import org.apache.impala.thrift.THdfsPartition; import org.apache.impala.thrift.THdfsTable; import org.apache.impala.thrift.TNetworkAddress; @@ -461,6 +460,12 @@ public class HdfsTable extends Table implements FeFsTable { return null; } + @Override + public boolean isHiveAcid() { + if (validWriteIds_ != null) return true; + return AcidUtils.isTransactionalTable(msTable_.getParameters()); + } + /** * Marks a partition dirty by registering the partition builder for its new instance. */ @@ -2293,56 +2298,16 @@ public class HdfsTable extends Table implements FeFsTable { return new TGetPartialCatalogObjectResponse().setLookup_status( CatalogLookupStatus.PARTITION_NOT_FOUND); } - TPartialPartitionInfo partInfo = new TPartialPartitionInfo(partId); - - if (req.table_info_selector.want_partition_names) { - partInfo.setName(part.getPartitionName()); - } - - if (req.table_info_selector.want_partition_metadata) { - part.setPartitionMetadata(partInfo); - partInfo.setHas_incremental_stats(part.hasIncrementalStats()); - } - if (req.table_info_selector.want_hms_partition) { - partInfo.hms_partition = part.toHmsPartition(); - } - - if (req.table_info_selector.want_partition_files) { - partInfo.setLast_compaction_id(part.getLastCompactionId()); - try { - if (!part.getInsertFileDescriptors().isEmpty()) { - partInfo.file_descriptors = new ArrayList<>(); - partInfo.insert_file_descriptors = new ArrayList<>(); - numFilesFiltered += addFilteredFds(part.getInsertFileDescriptors(), - partInfo.insert_file_descriptors, reqWriteIdList); - partInfo.delete_file_descriptors = new ArrayList<>(); - numFilesFiltered += addFilteredFds(part.getDeleteFileDescriptors(), - partInfo.delete_file_descriptors, reqWriteIdList); - } else { - partInfo.file_descriptors = new ArrayList<>(); - numFilesFiltered += addFilteredFds(part.getFileDescriptors(), - partInfo.file_descriptors, reqWriteIdList); - partInfo.insert_file_descriptors = new ArrayList<>(); - partInfo.delete_file_descriptors = new ArrayList<>(); - } - hits.inc(); - } catch (CatalogException ex) { - if (LOG.isDebugEnabled()) { - LOG.debug("Could not use cached file descriptors of partition {} of table" - + " {} for writeIdList {}", part.getPartitionName(), getFullName(), - reqWriteIdList, ex); - } - misses.inc(); - missingPartitionInfos.put(part, partInfo); - } - } - - if (req.table_info_selector.want_partition_stats) { - partInfo.setPartition_stats(part.getPartitionStatsCompressed()); + Pair<TPartialPartitionInfo, Integer> partInfoStatus = + part.getPartialPartitionInfo(req, reqWriteIdList); + if (partInfoStatus.second != null) { + hits.inc(); + numFilesFiltered += partInfoStatus.second; + } else { + misses.inc(); + missingPartitionInfos.put(part, partInfoStatus.first); } - - partInfo.setIs_marked_cached(part.isMarkedCached()); - resp.table_info.partitions.add(partInfo); + resp.table_info.partitions.add(partInfoStatus.first); } } // In most of the cases, the prefix map only contains one item for the table location. @@ -2373,16 +2338,6 @@ public class HdfsTable extends Table implements FeFsTable { return resp; } - private int addFilteredFds(List<FileDescriptor> fds, List<THdfsFileDesc> thriftFds, - ValidWriteIdList writeIdList) throws CatalogException { - List<FileDescriptor> filteredFds = new ArrayList<>(fds); - int numFilesFiltered = AcidUtils.filterFdsForAcidState(filteredFds, writeIdList); - for (FileDescriptor fd: filteredFds) { - thriftFds.add(fd.toThrift()); - } - return numFilesFiltered; - } - private double getFileMetadataCacheHitRate() { long hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC).getCount(); long misses = metrics_.getCounter(FILEMETADATA_CACHE_MISS_METRIC).getCount(); diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index 38d72914c..09d6e8d12 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -42,6 +44,7 @@ import org.apache.impala.analysis.IcebergPartitionTransform; import org.apache.impala.catalog.iceberg.GroupedContentFiles; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.service.BackendConfig; +import org.apache.impala.thrift.CatalogLookupStatus; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TCompressionCodec; import org.apache.impala.thrift.TGetPartialCatalogObjectRequest; @@ -55,6 +58,7 @@ import org.apache.impala.thrift.TIcebergPartitionSpec; import org.apache.impala.thrift.TIcebergPartitionStats; import org.apache.impala.thrift.TIcebergTable; import org.apache.impala.thrift.TPartialPartitionInfo; +import org.apache.impala.thrift.TSqlConstraints; import org.apache.impala.thrift.TTable; import org.apache.impala.thrift.TTableDescriptor; import org.apache.impala.thrift.TTableType; @@ -553,12 +557,6 @@ public class IcebergTable extends Table implements FeIcebergTable { if (!col.getStats().hasNumDistinctValues() || snapshot.timestampMillis() >= hmsStatsTimestampMs) { col.getStats().setNumDistinctValues(ndv); - - // In local catalog mode, the stats sent from the catalog are those of - // 'hdfsTable_', not those of this class. - Column hdfsTableCol = hdfsTable_.getColumn(col.getName()); - Preconditions.checkNotNull(hdfsTableCol); - hdfsTableCol.getStats().setNumDistinctValues(ndv); } } } @@ -711,7 +709,7 @@ public class IcebergTable extends Table implements FeIcebergTable { List<TIcebergPartitionSpec> params) { List<IcebergPartitionSpec> ret = new ArrayList<>(); for (TIcebergPartitionSpec param : params) { - // Non-partition iceberg table only has one PartitionSpec with an empty + // Non-partitioned iceberg table only has one PartitionSpec with an empty // PartitionField set and a partition id if (param.getPartition_fields() != null) { List<IcebergPartitionField> fields = new ArrayList<>(); @@ -760,16 +758,61 @@ public class IcebergTable extends Table implements FeIcebergTable { public TGetPartialCatalogObjectResponse getPartialInfo( TGetPartialCatalogObjectRequest req) throws CatalogException { Preconditions.checkState(isLoaded(), "unloaded table: %s", getFullName()); - Map<HdfsPartition, TPartialPartitionInfo> missingPartialInfos = new HashMap<>(); - TGetPartialCatalogObjectResponse resp = - getHdfsTable().getPartialInfo(req, missingPartialInfos); - if (resp.table_info != null) { - // Clear HdfsTable virtual columns and add IcebergTable virtual columns. - resp.table_info.unsetVirtual_columns(); - for (VirtualColumn vCol : getVirtualColumns()) { - resp.table_info.addToVirtual_columns(vCol.toThrift()); + TGetPartialCatalogObjectResponse resp = super.getPartialInfo(req); + Preconditions.checkState(resp.table_info != null); + boolean wantPartitionInfo = req.table_info_selector.want_partition_files + || req.table_info_selector.want_partition_metadata + || req.table_info_selector.want_partition_names + || req.table_info_selector.want_partition_stats; + Preconditions.checkState(!req.table_info_selector.want_hms_partition); + Collection<Long> partIds = req.table_info_selector.partition_ids; + + if (partIds != null && partIds.isEmpty()) { + resp.table_info.partitions = Lists.newArrayListWithCapacity(0); + } else if (wantPartitionInfo || partIds != null) { + // Caller specified at least one piece of partition info. If they didn't explicitly + // specify the partitions, it means that they want the info for all partitions. + // (Iceberg tables are handled as unpartitioned tables, having only 1 partition.) + Preconditions.checkState(partIds == null || partIds.size() == 1); + long partId = getPartitionMap().keySet().iterator().next(); + FeFsPartition part = (FeFsPartition) getPartitionMap().get(partId); + if (part == null) { + LOG.warn(String.format("Missing partition ID: %s, Table: %s", partId, + getFullName())); + return new TGetPartialCatalogObjectResponse().setLookup_status( + CatalogLookupStatus.PARTITION_NOT_FOUND); } + TPartialPartitionInfo partInfo = part.getDefaultPartialPartitionInfo(req); + resp.table_info.partitions = Lists.newArrayList(partInfo); + } + + // In most of the cases, the prefix map only contains one item for the table location. + // Here we always send it since it's small. + resp.table_info.setPartition_prefixes( + hdfsTable_.partitionLocationCompressor_.getPrefixes()); + + if (req.table_info_selector.want_partition_files) { + // TODO(todd) we are sending the whole host index even if we returned only + // one file -- maybe not so efficient, but the alternative is to do a bunch + // of cloning of file descriptors which might increase memory pressure. + resp.table_info.setNetwork_addresses(getHostIndex().getList()); } + + if (req.table_info_selector.want_table_constraints) { + TSqlConstraints sqlConstraints = + new TSqlConstraints(getSqlConstraints().getPrimaryKeys(), + getSqlConstraints().getForeignKeys()); + resp.table_info.setSql_constraints(sqlConstraints); + } + // Publish the isMarkedCached_ marker so coordinators don't need to validate + // it again which requires additional HDFS RPCs. + resp.table_info.setIs_marked_cached(isMarkedCached()); + + // Add IcebergTable virtual columns. + for (VirtualColumn vCol : getVirtualColumns()) { + resp.table_info.addToVirtual_columns(vCol.toThrift()); + } + if (req.table_info_selector.want_iceberg_table) { resp.table_info.setIceberg_table(Utils.getTIcebergTable(this)); if (!resp.table_info.isSetNetwork_addresses()) { diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java index 4f5521ddf..bd1113368 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java @@ -297,6 +297,11 @@ public class LocalFsTable extends LocalTable implements FeFsTable { return null; } + @Override + public boolean isHiveAcid() { + return AcidUtils.isTransactionalTable(getMetaStoreTable().getParameters()); + } + @Override public TResultSet getTableStats() { return HdfsTable.getTableStats(this);
