This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 91343369f7f34c0990129d565b9172001ee5570b Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Mon Feb 24 18:22:43 2025 +0100 IMPALA-13785: Deflake test_catalog_tables_stats in exhaustive build IMPALA-13737 changed how we load file metadata for Iceberg tables, as we don't rely on the underlying HdfsTable anymore. As a result, metrics were not correctly updated. This patch adds code to update metrics correctly. Testing: * executed the tests in the title in exhaustive mode * added Iceberg-specific tests to test_web_pages Change-Id: I686e0e32b36ace4c5a8c9eeb715148c98521252a Reviewed-on: http://gerrit.cloudera.org:8080/22531 Reviewed-by: Quanlong Huang <[email protected]> Reviewed-by: Noemi Pap-Takacs <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Riza Suminto <[email protected]> --- .../java/org/apache/impala/catalog/FeFsTable.java | 67 ++++++++++++++++++++++ .../apache/impala/catalog/FileMetadataLoader.java | 2 +- .../org/apache/impala/catalog/HdfsPartition.java | 4 +- .../java/org/apache/impala/catalog/HdfsTable.java | 65 --------------------- .../impala/catalog/IcebergFileMetadataLoader.java | 2 +- .../org/apache/impala/catalog/IcebergTable.java | 34 ++++++++++- tests/webserver/test_web_pages.py | 23 ++++++++ 7 files changed, 127 insertions(+), 70 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java index 4edb8858f..b4d464b4f 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java @@ -82,10 +82,77 @@ public interface FeFsTable extends FeTable { // Internal table property that specifies the total size of the table. public static final String TOTAL_SIZE = "totalSize"; + // FS table specific metrics + public static final String NUM_FILES_METRIC = "num-files"; + public static final String NUM_BLOCKS_METRIC = "num-blocks"; + public static final String TOTAL_FILE_BYTES_METRIC = "total-file-size-bytes"; + public static final String MEMORY_ESTIMATE_METRIC = "memory-estimate-bytes"; + // Internal table property that specifies the number of erasure coded files in the // table. public static final String NUM_ERASURE_CODED_FILES = "numFilesErasureCoded"; + // Average memory requirements (in bytes) for storing a file descriptor. + public static final long PER_FD_MEM_USAGE_BYTES = 500; + + // Average memory requirements (in bytes) for storing a block. + public static final long PER_BLOCK_MEM_USAGE_BYTES = 150; + + // Represents a set of storage-related statistics aggregated at the table or partition + // level. + public final static class FileMetadataStats { + // Number of files in a table/partition. + public long numFiles = 0; + // Number of blocks in a table/partition. + public long numBlocks = 0; + // Total size (in bytes) of all files in a table/partition. + public long totalFileBytes = 0; + + public FileMetadataStats() {} + + /** + * This constructor allows third party extensions to instantiate a FileMetadataStats + * with a List of FileDescriptor's. + */ + public FileMetadataStats(List<FileDescriptor> fds) { + for (FileDescriptor fd : fds) { + accumulate(fd); + } + } + + // Initializes the values of the storage stats. + public void init() { + numFiles = 0; + numBlocks = 0; + totalFileBytes = 0; + } + + public void set(FileMetadataStats stats) { + numFiles = stats.numFiles; + numBlocks = stats.numBlocks; + totalFileBytes = stats.totalFileBytes; + } + + public void merge(FileMetadataStats other) { + numFiles += other.numFiles; + numBlocks += other.numBlocks; + totalFileBytes += other.totalFileBytes; + } + + public void remove(FileMetadataStats other) { + numFiles -= other.numFiles; + numBlocks -= other.numBlocks; + totalFileBytes -= other.totalFileBytes; + } + + // Accumulate the statistics of the fd into this FileMetadataStats. + public void accumulate(FileDescriptor fd) { + numBlocks += fd.getNumFileBlocks(); + totalFileBytes += fd.getFileLength(); + ++numFiles; + } + } + /** * @return true if the table and all its partitions reside at locations which * support caching (e.g. HDFS). diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java index 214811241..fce49a9fe 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java @@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; -import org.apache.impala.catalog.HdfsTable.FileMetadataStats; +import org.apache.impala.catalog.FeFsTable.FileMetadataStats; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.Reference; import org.apache.impala.thrift.TNetworkAddress; diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index 3d337beec..c3eef49fe 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.LiteralExpr; import org.apache.impala.analysis.PartitionKeyValue; -import org.apache.impala.catalog.HdfsTable.FileMetadataStats; +import org.apache.impala.catalog.FeFsTable.FileMetadataStats; import org.apache.impala.catalog.events.InFlightEvents; import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey; import org.apache.impala.common.FileSystemUtil; @@ -275,7 +275,7 @@ public class HdfsPartition extends CatalogObjectImpl isMarkedCached, accessLevel, hmsParameters, cachedMsPartitionDescriptor, partitionStats, hasIncrementalStats, numRows, writeId, inFlightEvents, /*createEventId=*/-1L, /*lastCompactionId*/-1L, - /*lastRefreshEventId*/-1L, new HdfsTable.FileMetadataStats()); + /*lastRefreshEventId*/-1L, new FileMetadataStats()); } /** diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index b68d21eb4..020df5e1d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -175,19 +175,9 @@ public class HdfsTable extends Table implements FeFsTable { // Average memory requirements (in bytes) for storing the metadata of a partition. private static final long PER_PARTITION_MEM_USAGE_BYTES = 2048; - // Average memory requirements (in bytes) for storing a file descriptor. - private static final long PER_FD_MEM_USAGE_BYTES = 500; - - // Average memory requirements (in bytes) for storing a block. - private static final long PER_BLOCK_MEM_USAGE_BYTES = 150; - // Hdfs table specific metrics public static final String CATALOG_UPDATE_DURATION_METRIC = "catalog-update-duration"; public static final String NUM_PARTITIONS_METRIC = "num-partitions"; - public static final String NUM_FILES_METRIC = "num-files"; - public static final String NUM_BLOCKS_METRIC = "num-blocks"; - public static final String TOTAL_FILE_BYTES_METRIC = "total-file-size-bytes"; - public static final String MEMORY_ESTIMATE_METRIC = "memory-estimate-bytes"; public static final String HAS_INCREMENTAL_STATS_METRIC = "has-incremental-stats"; // metrics used to find out the cache hit rate when file-metadata is requested // for a given ValidWriteIdList @@ -343,61 +333,6 @@ public class HdfsTable extends Table implements FeFsTable { skipIcebergFileMetadataLoading_ = skipIcebergFileMetadataLoading; } - // Represents a set of storage-related statistics aggregated at the table or partition - // level. - public final static class FileMetadataStats { - // Number of files in a table/partition. - public long numFiles = 0; - // Number of blocks in a table/partition. - public long numBlocks = 0; - // Total size (in bytes) of all files in a table/partition. - public long totalFileBytes = 0; - - public FileMetadataStats() {} - - /** - * This constructor allows third party extensions to instantiate a FileMetadataStats - * with a List of FileDescriptor's. - */ - public FileMetadataStats(List<FileDescriptor> fds) { - for (FileDescriptor fd : fds) { - accumulate(fd); - } - } - - // Initializes the values of the storage stats. - public void init() { - numFiles = 0; - numBlocks = 0; - totalFileBytes = 0; - } - - public void set(FileMetadataStats stats) { - numFiles = stats.numFiles; - numBlocks = stats.numBlocks; - totalFileBytes = stats.totalFileBytes; - } - - public void merge(FileMetadataStats other) { - numFiles += other.numFiles; - numBlocks += other.numBlocks; - totalFileBytes += other.totalFileBytes; - } - - public void remove(FileMetadataStats other) { - numFiles -= other.numFiles; - numBlocks -= other.numBlocks; - totalFileBytes -= other.totalFileBytes; - } - - // Accumulate the statistics of the fd into this FileMetadataStats. - public void accumulate(FileDescriptor fd) { - numBlocks += fd.getNumFileBlocks(); - totalFileBytes += fd.getFileLength(); - ++numFiles; - } - } - // Table level storage-related statistics. Depending on whether the table is stored in // the catalog server or the impalad catalog cache, these statistics serve different // purposes and, hence, are managed differently. diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java index 1b258467e..c77766723 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java @@ -44,8 +44,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.ContentFile; +import org.apache.impala.catalog.FeFsTable.FileMetadataStats; import org.apache.impala.catalog.FeIcebergTable.Utils; -import org.apache.impala.catalog.HdfsTable.FileMetadataStats; import org.apache.impala.catalog.iceberg.GroupedContentFiles; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.Reference; diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index 7a20d88bf..38d72914c 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -17,6 +17,7 @@ package org.apache.impala.catalog; +import com.codahale.metrics.Gauge; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -237,6 +238,8 @@ public class IcebergTable extends Table implements FeIcebergTable { private Map<Integer, IcebergColumn> icebergFieldIdToCol_; private Map<String, TIcebergPartitionStats> partitionStats_; + private final FileMetadataStats fileMetadataStats_ = new FileMetadataStats(); + protected IcebergTable(org.apache.hadoop.hive.metastore.api.Table msTable, Db db, String name, String owner) { super(msTable, db, name, owner); @@ -405,6 +408,27 @@ public class IcebergTable extends Table implements FeIcebergTable { return table; } + @Override + public void initMetrics() { + super.initMetrics(); + metrics_.addGauge(NUM_FILES_METRIC, new Gauge<Long>() { + @Override + public Long getValue() { return fileMetadataStats_.numFiles; } + }); + metrics_.addGauge(NUM_BLOCKS_METRIC, new Gauge<Long>() { + @Override + public Long getValue() { return fileMetadataStats_.numBlocks; } + }); + metrics_.addGauge(TOTAL_FILE_BYTES_METRIC, new Gauge<Long>() { + @Override + public Long getValue() { return fileMetadataStats_.totalFileBytes; } + }); + metrics_.addGauge(MEMORY_ESTIMATE_METRIC, new Gauge<Long>() { + @Override + public Long getValue() { return getEstimatedMetadataSize(); } + }); + } + /** * Loads the metadata of an Iceberg table. * <p> @@ -461,19 +485,27 @@ public class IcebergTable extends Table implements FeIcebergTable { loadAllColumnStats(msClient, catalogTimeline); applyPuffinNdvStats(catalogTimeline); setAvroSchema(msClient, msTbl, fileStore_, catalogTimeline); + updateMetrics(loader.getFileMetadataStats()); } catch (Exception e) { throw new IcebergTableLoadingException("Error loading metadata for Iceberg table " + icebergTableLocation_, e); } finally { storageMetadataLoadTime_ = ctxStorageLdTime.stop(); } - refreshLastUsedTime(); } finally { context.stop(); } } + private void updateMetrics(FileMetadataStats stats) { + long memUsageEstimate = stats.numFiles * PER_FD_MEM_USAGE_BYTES + + stats.numBlocks * PER_BLOCK_MEM_USAGE_BYTES; + setEstimatedMetadataSize(memUsageEstimate); + setNumFiles(stats.numFiles); + fileMetadataStats_.set(stats); + } + // Reads NDV stats from Puffin files belonging to the table (if any). // // If there are Puffin stats for different snapshots, the most recent one will be used diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py index a97d8b42b..52c8f59fe 100644 --- a/tests/webserver/test_web_pages.py +++ b/tests/webserver/test_web_pages.py @@ -21,6 +21,7 @@ from tests.common.file_utils import grep_dir from tests.common.skip import SkipIfBuildType, SkipIfDockerizedCluster from tests.common.impala_cluster import ImpalaCluster from tests.common.impala_test_suite import ImpalaTestSuite +from tests.util.filesystem_utils import IS_HDFS from tests.util.parse_util import parse_duration_string_ms from datetime import datetime from multiprocessing import Process, Queue @@ -1005,6 +1006,28 @@ class TestWebPage(ImpalaTestSuite): assert "catalog.hms-client-pool.num-idle" in metric_keys assert "catalog.hms-client-pool.num-in-use" in metric_keys + def test_iceberg_table_metrics(self): + assert '23448' == self.__get_table_metric( + "functional_parquet", "iceberg_non_partitioned", "total-file-size-bytes") + assert '20' == self.__get_table_metric( + "functional_parquet", "iceberg_non_partitioned", "num-files") + assert '13000' == self.__get_table_metric( + "functional_parquet", "iceberg_non_partitioned", "memory-estimate-bytes") + if IS_HDFS: + assert '20' == self.__get_table_metric( + "functional_parquet", "iceberg_non_partitioned", "num-blocks") + + def __get_table_metric(self, db_name, tbl_name, metric): + self.client.execute("refresh %s.%s" % (db_name, tbl_name)) + responses = self.get_and_check_status(self.TABLE_METRICS_URL + "?name=%s.%s&json" % + (db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT) + response_json = json.loads(responses[0].text) + metrics_text = response_json['table_metrics'] + for line in metrics_text.split('\n'): + if line.startswith(metric): + return line.split(": ")[1] + return None + def test_query_progress(self): """Tests that /queries page shows query progress.""" query = "select count(*) from functional_parquet.alltypes where bool_col = sleep(100)"
