This is an automated email from the ASF dual-hosted git repository.

jasonmfehr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 19c12e0e06b3d94e370710825b5a71348b6abec3
Author: Daniel Becker <[email protected]>
AuthorDate: Tue Jul 22 16:08:17 2025 +0200

    IMPALA-14261: Take 'impala.computeStatsSnapshotId' into account when 
deciding between Puffin and HMS stats
    
    Since IMPALA-13609, Impala writes snapshot information for each column
    on COMPUTE STATS for Iceberg tables (see there for why it is useful),
    but this information has so far been ignored.
    
    After this change, snapshot information is used when deciding which of
    HMS and Puffin NDV stats should be used (i.e. which is more recent).
    
    This test also modifies the
    IcebergUtil.ComputeStatsSnapshotPropertyConverter class: previously
    Iceberg fieldIds were stored as Long, but now they are stored as
    Integer, in accordance with the Iceberg spec.
    
    Documentation:
     - updated the docs about Puffin stats in docs/topics/impala_iceberg.xml
    Testing:
     - modified existing tests to fit the new decision mechanism
    
    Change-Id: I95a5b152dd504e94dea368a107d412e33f67930c
    Reviewed-on: http://gerrit.cloudera.org:8080/23251
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Daniel Becker <[email protected]>
    Tested-by: Daniel Becker <[email protected]>
---
 docs/topics/impala_iceberg.xml                     |  9 ++---
 .../org/apache/impala/catalog/IcebergTable.java    | 29 ++++++++--------
 .../apache/impala/catalog/PuffinStatsLoader.java   | 24 ++++++-------
 .../impala/catalog/local/IcebergMetaProvider.java  |  2 +-
 .../java/org/apache/impala/util/IcebergUtil.java   | 39 +++++++++++++---------
 .../org/apache/impala/util/IcebergUtilTest.java    | 10 +++---
 tests/custom_cluster/test_iceberg_with_puffin.py   | 20 +++++------
 7 files changed, 70 insertions(+), 63 deletions(-)

diff --git a/docs/topics/impala_iceberg.xml b/docs/topics/impala_iceberg.xml
index e49133ddb..7756d43d3 100644
--- a/docs/topics/impala_iceberg.xml
+++ b/docs/topics/impala_iceberg.xml
@@ -896,10 +896,11 @@ ORDER BY made_current_at;
       come from different snapshots.
       </p>
       <p>
-      In case there are both HMS and Puffin stats for a column, the more 
recent one will
-      be used - for HMS stats we use the 'impala.lastComputeStatsTime' table 
property, and
-      for Puffin stats we use the snapshot timestamp to determine which one is 
more
-      recent.
+      In case there are both HMS and Puffin NDV stats for a column, the more 
recent one
+      will be used. For HMS stats we use the 'impala.computeStatsSnapshotId' 
table
+      property which stores, for each column, the snapshot for which HMS stats 
were
+      calculated. We compare this with the snapshot of the Puffin stats to 
decide which
+      is more recent.
       </p>
       <p>
       Reading Puffin stats is disabled by default; set the 
"--enable_reading_puffin_stats"
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 36799218f..6a71f007b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -574,12 +574,11 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
     if (!BackendConfig.INSTANCE.enableReadingPuffinStats()) return;
     if (!isPuffinStatsReadingEnabledForTable()) return;
 
-    long hmsStatsTimestampMs = getLastComputeStatsTimeMs();
-    Set<Integer> fieldIdsWithHmsStats = collectFieldIdsWithNdvStats();
+    Map<Integer, Long> fieldIdsWithHmsStats = 
getComputeStatsSnapshotMap(msTable_);
 
     Map<Integer, PuffinStatsLoader.PuffinStatsRecord> puffinNdvs =
         PuffinStatsLoader.loadPuffinStats(icebergApiTable_, getFullName(),
-            hmsStatsTimestampMs, fieldIdsWithHmsStats);
+            fieldIdsWithHmsStats);
     for (Map.Entry<Integer, PuffinStatsLoader.PuffinStatsRecord> entry
         : puffinNdvs.entrySet()) {
       int fieldId = entry.getKey();
@@ -600,12 +599,7 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
         // mode: in local catalog mode, the catalog sends the stats in HMS 
objects, so
         // NDVs for unsupported types would be lost.
         if (ColumnStats.supportsNdv(colType)) {
-          // Only use the value from Puffin if it is more recent than the HMS 
stat value
-          // or if the latter doesn't exist.
-          if (!col.getStats().hasNumDistinctValues()
-              || snapshot.timestampMillis() >= hmsStatsTimestampMs) {
-            col.getStats().setNumDistinctValues(ndv);
-          }
+          col.getStats().setNumDistinctValues(ndv);
         }
       }
     }
@@ -892,18 +886,23 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
   public void updateComputeStatsIcebergSnapshotsProperty(
       org.apache.hadoop.hive.metastore.api.Table msTbl,
       TAlterTableUpdateStatsParams params) {
-    String snapshotIds = msTbl.getParameters().get(
-        IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS);
+    TreeMap<Integer, Long> computeStatsMap = getComputeStatsSnapshotMap(msTbl);
 
-    TreeMap<Long, Long> computeStatsMap =
-        
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(snapshotIds);
     updateComputeStatsIcebergSnapshotMap(computeStatsMap, params);
     String property =
         
IcebergUtil.ComputeStatsSnapshotPropertyConverter.mapToString(computeStatsMap);
     msTbl.putToParameters(IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS, property);
   }
 
-  private void updateComputeStatsIcebergSnapshotMap(Map<Long, Long> map,
+  private TreeMap<Integer, Long> getComputeStatsSnapshotMap(
+      org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    String snapshotIds = msTbl.getParameters().get(
+        IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS);
+
+    return 
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(snapshotIds);
+  }
+
+  private void updateComputeStatsIcebergSnapshotMap(Map<Integer, Long> map,
       TAlterTableUpdateStatsParams params) {
     // This will be -1 if there is no snapshot yet.
     Preconditions.checkState(params.isSetSnapshot_id());
@@ -912,7 +911,7 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
     // Insert/update columns for which we have computed stats.
     if (params.isSetColumn_stats()) {
       for (String colName : params.column_stats.keySet()) {
-        long fieldId = 
getIcebergApiTable().schema().findField(colName).fieldId();
+        int fieldId = 
getIcebergApiTable().schema().findField(colName).fieldId();
         map.put(fieldId, currentSnapshotId);
       }
     }
diff --git a/fe/src/main/java/org/apache/impala/catalog/PuffinStatsLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/PuffinStatsLoader.java
index a90189e4f..1ecc242d2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/PuffinStatsLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/PuffinStatsLoader.java
@@ -66,10 +66,9 @@ public class PuffinStatsLoader {
   private final Table iceApiTable_;
   private final String tblName_;
 
-  // The timestamp of the HMS stats and columns that have HMS stats. Puffin 
NDVs will only
-  // be loaded if they are more recent than existing HMS stats.
-  private final long hmsStatsTimestampMs_;
-  private final Set<Integer> fieldIdsWithHmsStats_;
+  // Map of fields that have HMS stats - the values are the snapshot IDs. 
Puffin NDVs will
+  // only be loaded if they are more recent than existing HMS stats.
+  private final Map<Integer, Long> fieldIdsWithHmsStats_;
 
   // The blobs to read from Puffin files. Initialised in 'initBlobsToRead()'. 
The keys of
   // the inner map are fieldIds and its values are snapshotIds - together they 
identify
@@ -94,11 +93,10 @@ public class PuffinStatsLoader {
     }
   }
 
-  private PuffinStatsLoader(Table iceApiTable, String tblName, long 
hmsStatsTimestampMs,
-      Set<Integer> fieldIdsWithHmsStats) {
+  private PuffinStatsLoader(Table iceApiTable, String tblName,
+      Map<Integer, Long> fieldIdsWithHmsStats) {
     iceApiTable_ = iceApiTable;
     tblName_ = tblName;
-    hmsStatsTimestampMs_ = hmsStatsTimestampMs;
     fieldIdsWithHmsStats_ = fieldIdsWithHmsStats;
   }
 
@@ -107,15 +105,16 @@ public class PuffinStatsLoader {
    * column, the most recent available NDV value is chosen.
    *
    * Stats for columns in 'fieldIdsWithHmsStats' are only loaded if they 
belong to a
-   * snapshot that is more recent than 'hmsStatsTimestampMs'.
+   * snapshot that is more recent than the corresponding snapshot id in the 
map, i.e. if
+   * the Puffin NDV is more recent than the HMS one.
    *
    * If it is detected that there are multiple blobs for a given 
fieldId-snapshotId pair,
    * a warning log is issued, but no attempt is made to detect all such cases.
    */
   public static Map<Integer, PuffinStatsRecord> loadPuffinStats(Table 
iceApiTable,
-      String tblName, long hmsStatsTimestampMs, Set<Integer> 
fieldIdsWithHmsStats) {
+      String tblName, Map<Integer, Long> fieldIdsWithHmsStats) {
     PuffinStatsLoader loader = new PuffinStatsLoader(iceApiTable, tblName,
-        hmsStatsTimestampMs, fieldIdsWithHmsStats);
+        fieldIdsWithHmsStats);
     return loader.loadPuffinStatsImpl();
   }
 
@@ -194,8 +193,9 @@ public class PuffinStatsLoader {
   // Returns true if there are HMS stats for the column referenced by 
'fieldId' that are
   // at least as recent as the snapshot referenced by 'snapshotId'
   private boolean hmsHasMoreRecentStats(int fieldId, long snapshotId) {
-    long snapshotTs = iceApiTable_.snapshot(snapshotId).timestampMillis();
-    return hmsStatsTimestampMs_ >= snapshotTs && 
fieldIdsWithHmsStats_.contains(fieldId);
+    Long hmsSnapshot = fieldIdsWithHmsStats_.get(fieldId);
+    if (hmsSnapshot == null) return false;
+    return isMoreRecentSnapshot(hmsSnapshot, snapshotId);
   }
 
   // Checks the metadata of 'statsFile' and loads NDV values where available.
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/IcebergMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/IcebergMetaProvider.java
index cb8d6624b..60a527ed6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/IcebergMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/IcebergMetaProvider.java
@@ -325,7 +325,7 @@ public class IcebergMetaProvider implements MetaProvider {
     org.apache.iceberg.Table iceTbl = tblImpl.iceApiTbl_;
     Map<Integer, PuffinStatsLoader.PuffinStatsRecord> puffinStats =
         PuffinStatsLoader.loadPuffinStats(iceTbl, tblImpl.fullName(),
-            -1, Collections.emptySet());
+            Collections.emptyMap());
 
     List<ColumnStatisticsObj> res = new ArrayList<>();
     for (String colName : colNames) {
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java 
b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index c8f23cde2..e54155725 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -1380,9 +1380,18 @@ public class IcebergUtil {
     return "";
   }
 
+  /**
+   * This is a helper class for converting the value of the
+   * 'impala.computeStatsSnapshotIds' table property to a Map representation 
and back.
+   *
+   * In the case of Iceberg tables, for each column with HMS stats, this 
property stores
+   * the snapshot id for which stats have been computed. It is a 
comma-separated list of
+   * values of the form "fieldIdRangeStart[-fieldIdRangeEndIncl]:snapshotId". 
The fieldId
+   * part may be a single value or a contiguous, inclusive range.
+   */
   public static class ComputeStatsSnapshotPropertyConverter {
-    public static TreeMap<Long, Long> stringToMap(String snapshotIds) {
-      TreeMap<Long, Long> res = new TreeMap<Long, Long>();
+    public static TreeMap<Integer, Long> stringToMap(String snapshotIds) {
+      TreeMap<Integer, Long> res = new TreeMap<Integer, Long>();
       if (snapshotIds == null) return res;
 
       String[] columns = snapshotIds.split(",");
@@ -1399,11 +1408,11 @@ public class IcebergUtil {
 
           String[] colRange = colStr.split("-");
           if (colRange.length == 1) {
-            res.put(Long.parseLong(colRange[0]), snapshotId);
+            res.put(Integer.parseInt(colRange[0]), snapshotId);
           } else if (colRange.length == 2) {
-            long rangeStart = Long.parseLong(colRange[0]);
-            long rangeEnd = Long.parseLong(colRange[1]);
-            for (long colId = rangeStart; colId <= rangeEnd; ++colId) {
+            int rangeStart = Integer.parseInt(colRange[0]);
+            int rangeEnd = Integer.parseInt(colRange[1]);
+            for (int colId = rangeStart; colId <= rangeEnd; ++colId) {
               res.put(colId, snapshotId);
             }
           } else {
@@ -1417,11 +1426,11 @@ public class IcebergUtil {
       return res;
     }
 
-    public static String mapToString(TreeMap<Long, Long> colAndSnapshotIds) {
+    public static String mapToString(TreeMap<Integer, Long> colAndSnapshotIds) 
{
       ConversionState state = new ConversionState();
 
-      for (Map.Entry<Long, Long> entry : colAndSnapshotIds.entrySet()) {
-        long col = entry.getKey();
+      for (Map.Entry<Integer, Long> entry : colAndSnapshotIds.entrySet()) {
+        int col = entry.getKey();
         long snapshotId = entry.getValue();
 
         if (state.canContinueRange(col, snapshotId)) {
@@ -1436,7 +1445,7 @@ public class IcebergUtil {
       return state.getResult();
     }
 
-    private static TreeMap<Long, Long> logAndReturnEmptyMap(String 
snapshotIdsStr) {
+    private static TreeMap<Integer, Long> logAndReturnEmptyMap(String 
snapshotIdsStr) {
       LOG.warn(String.format(
           "Invalid value for table property '%s': \"%s\". Ignoring it.",
           IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS, snapshotIdsStr));
@@ -1445,13 +1454,13 @@ public class IcebergUtil {
 
     private static class ConversionState {
       // Intentionally not using -1 as that is the snapshot ID of empty tables.
-      private static final long INVALID = -10;
-      private long colRangeStart_ = INVALID;
-      private long lastCol_ = INVALID;
+      private static final int INVALID = -10;
+      private int colRangeStart_ = INVALID;
+      private int lastCol_ = INVALID;
       private long lastSnapshotId_ = INVALID;
       private final StringBuilder sb_ = new StringBuilder();
 
-      private boolean canContinueRange(long col, long snapshotId) {
+      private boolean canContinueRange(int col, long snapshotId) {
         return lastCol_ != INVALID && lastSnapshotId_ != INVALID
             && lastSnapshotId_ == snapshotId && (lastCol_ + 1 == col);
       }
@@ -1468,7 +1477,7 @@ public class IcebergUtil {
         }
       }
 
-      private void initNewRange(long col, long snapshotId) {
+      private void initNewRange(int col, long snapshotId) {
         if (colRangeStart_ != INVALID) sb_.append(",");
         colRangeStart_ = col;
         lastCol_ = col;
diff --git a/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java 
b/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java
index f758d9109..15e8c9879 100644
--- a/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java
@@ -477,18 +477,18 @@ public class IcebergUtilTest {
 
     // Fill a disjunct range
     String disjunctRanges = "1-4:1234,6-8:1234,10:2345";
-    TreeMap<Long, Long> disjunctRangesMap =
+    TreeMap<Integer, Long> disjunctRangesMap =
         
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(disjunctRanges);
-    disjunctRangesMap.put(5L, 1234L);
+    disjunctRangesMap.put(5, 1234L);
     String completedRange = "1-8:1234,10:2345";
     assertEquals(completedRange,
         
IcebergUtil.ComputeStatsSnapshotPropertyConverter.mapToString(disjunctRangesMap));
 
     // Split a range
     String rangeToSplit = "1-8:1234,10:2345";
-    TreeMap<Long, Long> rangeToSplitMap =
+    TreeMap<Integer, Long> rangeToSplitMap =
         
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(rangeToSplit);
-    rangeToSplitMap.put(4L, 2345L);
+    rangeToSplitMap.put(4, 2345L);
     String splitRanges = "1-3:1234,4:2345,5-8:1234,10:2345";
     assertEquals(splitRanges,
         
IcebergUtil.ComputeStatsSnapshotPropertyConverter.mapToString(rangeToSplitMap));
@@ -514,7 +514,7 @@ public class IcebergUtilTest {
   }
 
   private String computeStatsPropertyRoundTrip(String property) {
-    TreeMap<Long, Long> map =
+    TreeMap<Integer, Long> map =
         
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(property);
     return IcebergUtil.ComputeStatsSnapshotPropertyConverter.mapToString(map);
   }
diff --git a/tests/custom_cluster/test_iceberg_with_puffin.py 
b/tests/custom_cluster/test_iceberg_with_puffin.py
index f70ffc5c7..bea1c22b3 100644
--- a/tests/custom_cluster/test_iceberg_with_puffin.py
+++ b/tests/custom_cluster/test_iceberg_with_puffin.py
@@ -117,12 +117,9 @@ class 
TestIcebergTableWithPuffinStats(CustomClusterTestSuite):
 
     self._change_metadata_json_file(tbl_info, 
"not_all_blobs_current.metadata.json")
 
-    # Get the latest snapshot's timestamp, set an HMS stat and set the last 
compute stats
-    # property to one second before the latest snapshot.
-    latest_snapshot_timestamp = self._get_latest_snapshot_timestamp(
-        tbl_info.full_tbl_name)
-    timestamp_before_snapshot = latest_snapshot_timestamp - 1
+    prev_snapshot_id = self._get_snapshot_ids(tbl_info.full_tbl_name)[1]
 
+    # Set HMS stats for the first and third column, for the previous snapshot.
     # There are Puffin stats from the latest snapshot for the first two 
columns, and older
     # Puffin stats for the next two columns - the HMS stats are more recent 
than these
     # older Puffin stats.
@@ -131,8 +128,9 @@ class 
TestIcebergTableWithPuffinStats(CustomClusterTestSuite):
             tbl_info.full_tbl_name),
         "alter table {} set column stats bigint_col('numDVs'='300')".format(
             tbl_info.full_tbl_name),
-        "alter table {} set 
tblproperties('impala.lastComputeStatsTime'='{}')".format(
-            tbl_info.full_tbl_name, timestamp_before_snapshot)
+        "alter table {} set tblproperties('impala.computeStatsSnapshotIds'= \
+            
'1:{prev_snapshot_id},3:{prev_snapshot_id}')".format(tbl_info.full_tbl_name,
+                prev_snapshot_id=prev_snapshot_id)
     ]
     for stmt in stmts:
       self.execute_query(stmt)
@@ -175,12 +173,12 @@ class 
TestIcebergTableWithPuffinStats(CustomClusterTestSuite):
 
     return self.TblInfo(tbl_name, vector, tbl_loc, tbl_properties)
 
-  def _get_latest_snapshot_timestamp(self, tbl_name):
-    query_template = "select unix_timestamp(max(committed_at)) \
-        latest_snapshot from {}.snapshots"
+  def _get_snapshot_ids(self, tbl_name):
+    """ Returns the list of the table's snapshot ids, starting with the 
current one."""
+    query_template = "select snapshot_id from {}.snapshots order by 
committed_at desc"
     query = query_template.format(tbl_name)
     query_res = self.execute_query(query)
-    return int(query_res.data[0])
+    return query_res.data
 
   def _copy_files_to_puffin_tbl(self, tbl_name, tbl_loc, uuid):
     version_info = sys.version_info

Reply via email to