This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new d71479890 IMPALA-13609: Store Iceberg snapshot id for COMPUTE STATS
d71479890 is described below

commit d714798904de449cd629df53934d8336bd767512
Author: Daniel Becker <[email protected]>
AuthorDate: Wed Jan 8 18:42:31 2025 +0100

    IMPALA-13609: Store Iceberg snapshot id for COMPUTE STATS
    
    Currently, when COMPUTE STATS is run from Impala, we set the
    'impala.lastComputeStatsTime' table property. Iceberg Puffin stats, on
    the other hand, store the snapshot id for which the stats were
    calculated. Although it is possible to retrieve the timestamp of a
    snapshot, comparing these two values is error-prone, e.g. in the
    following situation:
    
     - COMPUTE STATS calculation is running on snapshot N
     - snapshot N+1 is committed at time T
     - COMPUTE STATS finishes and sets 'impala.lastComputeStatsTime' at time
       T + Delta
     - some engine writes Puffin statistics for snapshot N+1
    
    After this, HMS stats will appear to be more recent even though they
    were calculated on snapshot N, while we have Puffin stats for snapshot
    N+1.
    
    To make comparisons easier, after this change, COMPUTE STATS sets a new
    table property, 'impala.computeStatsSnapshotIds'. This property stores
    the snapshot id for which stats have been computed, for each column. It
    is a comma-separated list of values of the form
    "fieldIdRangeStart[-fieldIdRangeEndIncl]:snapshotId". The fieldId part
    may be a single value or a contiguous, inclusive range.
    
    Storing the snapshot ids on a per-column basis is needed because COMPUTE
    STATS can be set to calculate stats for only a subset of the columns,
    and then a different subset in a subsequent run. The recency of the
    stats will then be different for each column.
    
    Storing the Iceberg field ids instead of column names makes the format
    easier to handle as we do not need to take care of escaping special
    characters.
    
    The 'impala.computeStatsSnapshotIds' table property is deleted after
    DROP STATS.
    
    Note that this change does not yet modify how Impala chooses between
    Puffin and HMS stats: that will be done in a separate change.
    
    Testing:
     - Added tests in iceberg-compute-stats.test checking that
       'impala.computeStatsSnapshotIds' is set correctly and is deleted
       after DROP STATS
     - added unit tests in IcebergUtilTest.java that check the parsing and
       serialisation of the table property
    
    Change-Id: Id9998b84c4fd20d1cf5e97a34f3553832ec70ae7
    Reviewed-on: http://gerrit.cloudera.org:8080/22339
    Reviewed-by: Daniel Becker <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/catalog-op-executor.cc                 |   4 +-
 be/src/exec/catalog-op-executor.h                  |   3 +-
 be/src/service/client-request-state.cc             |  21 +++-
 common/thrift/JniCatalog.thrift                    |   6 ++
 .../apache/impala/analysis/ComputeStatsStmt.java   |   4 +
 .../org/apache/impala/catalog/IcebergTable.java    |  53 ++++++++++-
 .../apache/impala/service/CatalogOpExecutor.java   |  19 +++-
 .../java/org/apache/impala/util/IcebergUtil.java   | 106 +++++++++++++++++++++
 .../org/apache/impala/util/IcebergUtilTest.java    |  62 +++++++++++-
 .../queries/QueryTest/iceberg-compute-stats.test   |  57 +++++++++++
 10 files changed, 327 insertions(+), 8 deletions(-)

diff --git a/be/src/exec/catalog-op-executor.cc 
b/be/src/exec/catalog-op-executor.cc
index b14a57348..042d71767 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -160,7 +160,8 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& 
request) {
 Status CatalogOpExecutor::ExecComputeStats(const TCatalogServiceRequestHeader& 
header,
     const TCatalogOpRequest& compute_stats_request,
     const TTableSchema& tbl_stats_schema, const TRowSet& tbl_stats_data,
-    const TTableSchema& col_stats_schema, const TRowSet& col_stats_data) {
+    const TTableSchema& col_stats_schema, const TRowSet& col_stats_data,
+    const std::optional<long>& snapshot_id) {
   // Create a new DDL request to alter the table's statistics.
   TCatalogOpRequest catalog_op_req;
   catalog_op_req.__isset.ddl_params = true;
@@ -185,6 +186,7 @@ Status CatalogOpExecutor::ExecComputeStats(const 
TCatalogServiceRequestHeader& h
   update_stats_params.__set_expect_all_partitions(
       compute_stats_params.expect_all_partitions);
   
update_stats_params.__set_is_incremental(compute_stats_params.is_incremental);
+  if (snapshot_id.has_value()) 
update_stats_params.__set_snapshot_id(snapshot_id.value());
 
   // Fill the alteration request based on the child-query results.
   SetTableStats(tbl_stats_schema, tbl_stats_data,
diff --git a/be/src/exec/catalog-op-executor.h 
b/be/src/exec/catalog-op-executor.h
index 33eabfb3a..98bf92317 100644
--- a/be/src/exec/catalog-op-executor.h
+++ b/be/src/exec/catalog-op-executor.h
@@ -62,7 +62,8 @@ class CatalogOpExecutor {
       const apache::hive::service::cli::thrift::TTableSchema& tbl_stats_schema,
       const apache::hive::service::cli::thrift::TRowSet& tbl_stats_data,
       const apache::hive::service::cli::thrift::TTableSchema& col_stats_schema,
-      const apache::hive::service::cli::thrift::TRowSet& col_stats_data);
+      const apache::hive::service::cli::thrift::TRowSet& col_stats_data,
+      const std::optional<long>& snapshot_id);
 
   /// Makes an RPC to the CatalogServer to prioritize the loading of the 
catalog objects
   /// specified in the TPrioritizeLoadRequest. Returns OK if the RPC was 
successful,
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 22bc65b40..eefefe438 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -17,6 +17,8 @@
 
 #include "service/client-request-state.h"
 
+#include <optional>
+
 #include <boost/algorithm/string/join.hpp>
 #include <boost/algorithm/string/predicate.hpp>
 #include <boost/algorithm/string/replace.hpp>
@@ -53,6 +55,7 @@
 #include "util/lineage-util.h"
 #include "util/pretty-printer.h"
 #include "util/redactor.h"
+#include "util/runtime-profile.h"
 #include "util/runtime-profile-counters.h"
 #include "util/time.h"
 #include "util/uid-util.h"
@@ -1866,6 +1869,20 @@ void ClientRequestState::MarkActive() {
   ++ref_count_;
 }
 
+std::optional<long> getIcebergSnapshotId(const TExecRequest& exec_req) {
+  DCHECK(exec_req.__isset.catalog_op_request);
+  DCHECK(exec_req.catalog_op_request.__isset.ddl_params);
+  DCHECK(exec_req.catalog_op_request.ddl_params.__isset.compute_stats_params);
+
+  const TComputeStatsParams& compute_stats_params =
+    exec_req.catalog_op_request.ddl_params.compute_stats_params;
+  if (compute_stats_params.__isset.iceberg_snapshot_id) {
+    return std::optional<long>(compute_stats_params.iceberg_snapshot_id);
+  } else {
+    return {};
+  }
+}
+
 Status ClientRequestState::UpdateTableAndColumnStats(
     const vector<ChildQuery*>& child_queries) {
   DCHECK_GE(child_queries.size(), 1);
@@ -1883,13 +1900,15 @@ Status ClientRequestState::UpdateTableAndColumnStats(
   }
 
   const TExecRequest& exec_req = exec_request();
+  std::optional<long> snapshot_id = getIcebergSnapshotId(exec_req);
   Status status = catalog_op_executor_->ExecComputeStats(
       GetCatalogServiceRequestHeader(),
       exec_req.catalog_op_request,
       child_queries[0]->result_schema(),
       child_queries[0]->result_data(),
       col_stats_schema,
-      col_stats_data);
+      col_stats_data,
+      snapshot_id);
   AddCatalogTimeline();
   {
     lock_guard<mutex> l(lock_);
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 369de115a..1b91bec65 100755
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -383,6 +383,9 @@ struct TAlterTableUpdateStatsParams {
 
   // If true, this is the result of an incremental stats computation
   6: optional bool is_incremental
+
+  // The snapshotId if this is an Iceberg table
+  7: optional i64 snapshot_id
 }
 
 // Parameters for ALTER TABLE SET [PARTITION partitionSet] CACHED|UNCACHED
@@ -697,6 +700,9 @@ struct TComputeStatsParams {
   // Sum of file sizes in the table. Only set for tables of type HDFS_TABLE 
and if
   // is_incremental is false.
   9: optional i64 total_file_bytes
+
+  // If this is an Iceberg table, stores the snapshot id for which stats are 
computed.
+  10: optional i64 iceberg_snapshot_id;
 }
 
 // Parameters for CREATE/DROP ROLE
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index f1e1e4116..14889899e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -1073,6 +1073,10 @@ public class ComputeStatsStmt extends StatementBase {
     if (table_ instanceof FeFsTable) {
       params.setTotal_file_bytes(((FeFsTable)table_).getTotalHdfsBytes());
     }
+
+    if (table_ instanceof FeIcebergTable) {
+      params.setIceberg_snapshot_id(((FeIcebergTable) table_).snapshotId());
+    }
     return params;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 09d6e8d12..75c026630 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -21,6 +21,7 @@ import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -30,8 +31,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
+
 
-import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -45,6 +47,7 @@ import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.CatalogLookupStatus;
+import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
@@ -121,6 +124,11 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
   public static final String ICEBERG_DISABLE_READING_PUFFIN_STATS =
       "impala.iceberg_read_puffin_stats";
 
+  // Table property that can be used to store for each column the snapshot id 
for which
+  // stats are stored in HMS (i.e. not Puffin stats).
+  public static final String COMPUTE_STATS_SNAPSHOT_IDS =
+      "impala.computeStatsSnapshotIds";
+
   // Internal Iceberg table property that specifies the absolute path of the 
current
   // table metadata. This property is only valid for tables in 'hive.catalog'.
   public static final String METADATA_LOCATION = "metadata_location";
@@ -827,4 +835,47 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
   public IcebergContentFileStore getContentFileStore() {
     return fileStore_;
   }
+
+  /**
+   * The IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS property stores the snapshot 
id for which
+   * stats have been computed, for each column. It is a comma-separated list 
of values of
+   * the form "fieldIdRangeStart[-fieldIdRangeEndIncl]:snapshotId". The 
fieldId part may
+   * be a single value or a contiguous, inclusive range.
+   *
+   * Storing the snapshot ids on a per-column basis is needed because COMPUTE 
STATS can be
+   * set to calculate stats for only a subset of the columns, and then a 
different subset
+   * in a subsequent run. The recency of the stats will then be different for 
each column.
+   *
+   * Storing the Iceberg field ids instead of column names makes the format 
easier to
+   * handle as we do not need to take care of escaping special characters.
+   */
+  public void updateComputeStatsIcebergSnapshotsProperty(
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      TAlterTableUpdateStatsParams params) {
+    String snapshotIds = msTbl.getParameters().get(
+        IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS);
+
+    TreeMap<Long, Long> computeStatsMap =
+        
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(snapshotIds);
+    updateComputeStatsIcebergSnapshotMap(computeStatsMap, params);
+    String property =
+        
IcebergUtil.ComputeStatsSnapshotPropertyConverter.mapToString(computeStatsMap);
+    msTbl.putToParameters(IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS, property);
+  }
+
+  private void updateComputeStatsIcebergSnapshotMap(Map<Long, Long> map,
+      TAlterTableUpdateStatsParams params) {
+    // This will be -1 if there is no snapshot yet.
+    Preconditions.checkState(params.isSetSnapshot_id());
+    final long currentSnapshotId = params.snapshot_id;
+
+    // Insert/update columns for which we have computed stats.
+    if (params.isSetColumn_stats()) {
+      for (String colName : params.column_stats.keySet()) {
+        long fieldId = 
getIcebergApiTable().schema().findField(colName).fieldId();
+        map.put(fieldId, currentSnapshotId);
+      }
+    }
+  }
+
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index eab8ce765..c8cdde9c8 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1994,6 +1994,9 @@ public class CatalogOpExecutor {
       // Set impala.lastComputeStatsTime just before alter_table to ensure 
that it is as
       // accurate as possible.
       Table.updateTimestampProperty(msTbl, 
HdfsTable.TBL_PROP_LAST_COMPUTE_STATS_TIME);
+      if (IcebergTable.isIcebergTable(msTbl)) {
+        ((IcebergTable) 
table).updateComputeStatsIcebergSnapshotsProperty(msTbl, params);
+      }
     }
 
     if (IcebergTable.isIcebergTable(msTbl) && 
isIcebergHmsIntegrationEnabled(msTbl)) {
@@ -2026,6 +2029,7 @@ public class CatalogOpExecutor {
     String CATALOG_SERVICE_ID = 
MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey();
     String CATALOG_VERSION    = 
MetastoreEventPropertyKey.CATALOG_VERSION.getKey();
     String COMPUTE_STATS_TIME = HdfsTable.TBL_PROP_LAST_COMPUTE_STATS_TIME;
+    String COMPUTE_STATS_SNAPSHOT_IDS = 
IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS;
 
     
Preconditions.checkState(msTbl.getParameters().containsKey(CATALOG_SERVICE_ID));
     
Preconditions.checkState(msTbl.getParameters().containsKey(CATALOG_VERSION));
@@ -2036,6 +2040,10 @@ public class CatalogOpExecutor {
     if (msTbl.getParameters().containsKey(COMPUTE_STATS_TIME)) {
       props.put(COMPUTE_STATS_TIME, 
msTbl.getParameters().get(COMPUTE_STATS_TIME));
     }
+    if (msTbl.getParameters().containsKey(COMPUTE_STATS_SNAPSHOT_IDS)) {
+      props.put(COMPUTE_STATS_SNAPSHOT_IDS,
+          msTbl.getParameters().get(COMPUTE_STATS_SNAPSHOT_IDS));
+    }
 
     org.apache.iceberg.Transaction iceTxn = 
IcebergUtil.getIcebergTransaction(iceTbl);
     IcebergCatalogOpExecutor.setTblProperties(iceTxn, props);
@@ -2726,6 +2734,7 @@ public class CatalogOpExecutor {
       catalog_.getLock().writeLock().unlock();
       modification.addCatalogServiceIdentifiersToTable();
       modification.registerInflightEvent();
+
       if (params.getPartition_set() == null) {
         // TODO: Report the number of updated partitions/columns to the user?
         // TODO: bulk alter the partitions.
@@ -2833,8 +2842,11 @@ public class CatalogOpExecutor {
         msTbl.getParameters().remove(StatsSetupConst.TOTAL_SIZE) != null;
     boolean droppedLastCompute =
         
msTbl.getParameters().remove(HdfsTable.TBL_PROP_LAST_COMPUTE_STATS_TIME) != 
null;
+    boolean droppedIcebergComputeStatsSnapshotIds =
+        msTbl.getParameters().remove(IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS) 
!= null;
 
-    if (droppedRowCount || droppedTotalSize || droppedLastCompute) {
+    if (droppedRowCount || droppedTotalSize || droppedLastCompute
+        || droppedIcebergComputeStatsSnapshotIds) {
       applyAlterTable(msTbl, false, null, catalogTimeline);
       ++numTargetedPartitions;
     }
@@ -2889,8 +2901,9 @@ public class CatalogOpExecutor {
     boolean isIntegratedIcebergTbl =
         IcebergTable.isIcebergTable(msTbl) && 
isIcebergHmsIntegrationEnabled(msTbl);
     Preconditions.checkState(isIntegratedIcebergTbl);
-    IcebergCatalogOpExecutor.unsetTblProperties(
-        iceTxn, 
Collections.singletonList(HdfsTable.TBL_PROP_LAST_COMPUTE_STATS_TIME));
+    IcebergCatalogOpExecutor.unsetTblProperties(iceTxn,
+       Arrays.asList(HdfsTable.TBL_PROP_LAST_COMPUTE_STATS_TIME,
+           IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS));
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java 
b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 78a99a72f..d299b7107 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
@@ -115,6 +116,8 @@ import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TIcebergPartitionField;
 import org.apache.impala.thrift.TIcebergPartitionSpec;
 import org.apache.impala.thrift.TIcebergPartitionTransformType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("UnstableApiUsage")
 public class IcebergUtil {
@@ -126,6 +129,8 @@ public class IcebergUtil {
   private static final int ICEBERG_EPOCH_HOUR = 0;
   public static final String HIVE_CATALOG = "hive.catalog";
 
+  private static final Logger LOG = LoggerFactory.getLogger(IcebergUtil.class);
+
   /**
    * Returns the corresponding catalog implementation for 'feTable'.
    */
@@ -1357,4 +1362,105 @@ public class IcebergUtil {
     }
     return "";
   }
+
+  public static class ComputeStatsSnapshotPropertyConverter {
+    public static TreeMap<Long, Long> stringToMap(String snapshotIds) {
+      TreeMap<Long, Long> res = new TreeMap<Long, Long>();
+      if (snapshotIds == null) return res;
+
+      String[] columns = snapshotIds.split(",");
+
+      for (String column : columns) {
+        String[] colIdAndSnapshotId = column.split(":");
+        if (colIdAndSnapshotId.length != 2) {
+          return logAndReturnEmptyMap(snapshotIds);
+        }
+
+        try {
+          String colStr = colIdAndSnapshotId[0];
+          Long snapshotId = Long.parseLong(colIdAndSnapshotId[1]);
+
+          String[] colRange = colStr.split("-");
+          if (colRange.length == 1) {
+            res.put(Long.parseLong(colRange[0]), snapshotId);
+          } else if (colRange.length == 2) {
+            long rangeStart = Long.parseLong(colRange[0]);
+            long rangeEnd = Long.parseLong(colRange[1]);
+            for (long colId = rangeStart; colId <= rangeEnd; ++colId) {
+              res.put(colId, snapshotId);
+            }
+          } else {
+            return logAndReturnEmptyMap(snapshotIds);
+          }
+        } catch (NumberFormatException e) {
+          return logAndReturnEmptyMap(snapshotIds);
+        }
+      }
+
+      return res;
+    }
+
+    public static String mapToString(TreeMap<Long, Long> colAndSnapshotIds) {
+      ConversionState state = new ConversionState();
+
+      for (Map.Entry<Long, Long> entry : colAndSnapshotIds.entrySet()) {
+        long col = entry.getKey();
+        long snapshotId = entry.getValue();
+
+        if (state.canContinueRange(col, snapshotId)) {
+          state.extendRange();
+        } else {
+          state.flushRange();
+          state.initNewRange(col, snapshotId);
+        }
+      }
+      state.flushRange();
+
+      return state.getResult();
+    }
+
+    private static TreeMap<Long, Long> logAndReturnEmptyMap(String 
snapshotIdsStr) {
+      LOG.warn(String.format(
+          "Invalid value for table property '%s': \"%s\". Ignoring it.",
+          IcebergTable.COMPUTE_STATS_SNAPSHOT_IDS, snapshotIdsStr));
+      return new TreeMap<>();
+    }
+
+    private static class ConversionState {
+      // Intentionally not using -1 as that is the snapshot ID of empty tables.
+      private static final long INVALID = -10;
+      private long colRangeStart_ = INVALID;
+      private long lastCol_ = INVALID;
+      private long lastSnapshotId_ = INVALID;
+      private final StringBuilder sb_ = new StringBuilder();
+
+      private boolean canContinueRange(long col, long snapshotId) {
+        return lastCol_ != INVALID && lastSnapshotId_ != INVALID
+            && lastSnapshotId_ == snapshotId && (lastCol_ + 1 == col);
+      }
+
+      private void extendRange() {
+        ++lastCol_;
+      }
+
+      private void flushRange() {
+        if (colRangeStart_ != INVALID) {
+          sb_.append(colRangeStart_);
+          if (lastCol_ != colRangeStart_) sb_.append("-").append(lastCol_);
+          sb_.append(":").append(lastSnapshotId_);
+        }
+      }
+
+      private void initNewRange(long col, long snapshotId) {
+        if (colRangeStart_ != INVALID) sb_.append(",");
+        colRangeStart_ = col;
+        lastCol_ = col;
+        lastSnapshotId_ = snapshotId;
+      }
+
+      private String getResult() {
+        return sb_.toString();
+      }
+    }
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java 
b/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java
index 0090dd75c..f758d9109 100644
--- a/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java
@@ -72,6 +72,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.List;
+import java.util.TreeMap;
 
 /**
  * Unit tests for Iceberg Utilities.
@@ -458,4 +459,63 @@ public class IcebergUtilTest {
       this.clazz = clazz;
     }
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testComputeStatsProperty() {
+    String empty = "";
+    assertEquals(empty, computeStatsPropertyRoundTrip(empty));
+
+    String singleCols = "1:1234,2:1235,4:2345,5:2346";
+    assertEquals(singleCols, computeStatsPropertyRoundTrip(singleCols));
+
+    String simpleRanges = "1-4:1234,6-10:2345";
+    assertEquals(simpleRanges, computeStatsPropertyRoundTrip(simpleRanges));
+
+    String rangesToMerge = "1-4:1234,5-8:1234,10:2345";
+    String mergedRanges = "1-8:1234,10:2345";
+    assertEquals(mergedRanges, computeStatsPropertyRoundTrip(rangesToMerge));
+
+    // Fill a disjunct range
+    String disjunctRanges = "1-4:1234,6-8:1234,10:2345";
+    TreeMap<Long, Long> disjunctRangesMap =
+        
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(disjunctRanges);
+    disjunctRangesMap.put(5L, 1234L);
+    String completedRange = "1-8:1234,10:2345";
+    assertEquals(completedRange,
+        
IcebergUtil.ComputeStatsSnapshotPropertyConverter.mapToString(disjunctRangesMap));
+
+    // Split a range
+    String rangeToSplit = "1-8:1234,10:2345";
+    TreeMap<Long, Long> rangeToSplitMap =
+        
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(rangeToSplit);
+    rangeToSplitMap.put(4L, 2345L);
+    String splitRanges = "1-3:1234,4:2345,5-8:1234,10:2345";
+    assertEquals(splitRanges,
+        
IcebergUtil.ComputeStatsSnapshotPropertyConverter.mapToString(rangeToSplitMap));
+
+    // Invalid properties yield empty maps
+    TreeMap<Long, Long> emptyMap = new TreeMap<>();
+    String invalid1 = "1:";
+    assertEquals(emptyMap,
+        
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(invalid1));
+
+    // Spaces are not allowed
+    String invalid2 = "1:1234, 2:2345";
+    assertEquals(emptyMap,
+        
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(invalid2));
+
+    String invalid3 = ":1234";
+    assertEquals(emptyMap,
+        
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(invalid3));
+
+    String invalid4 = "1-3-5:1234";
+    assertEquals(emptyMap,
+        
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(invalid4));
+  }
+
+  private String computeStatsPropertyRoundTrip(String property) {
+    TreeMap<Long, Long> map =
+        
IcebergUtil.ComputeStatsSnapshotPropertyConverter.stringToMap(property);
+    return IcebergUtil.ComputeStatsSnapshotPropertyConverter.mapToString(map);
+  }
+}
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-compute-stats.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-compute-stats.test
index 5a4d96f3c..e03307d65 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-compute-stats.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-compute-stats.test
@@ -389,3 +389,60 @@ COMPUTE INCREMENTAL STATS ice_alltypes_part PARTITION 
(i=1);
 ---- CATCH
 COMPUTE INCREMENTAL ... PARTITION not supported for Iceberg table
 ====
+
+---- QUERY
+# Tests for 'impala.computeStatsSnapshotIds'.
+create table test_ice (
+  i INT,
+  b BIGINT,
+  s STRING
+) stored as iceberg;
+compute stats test_ice;
+describe formatted test_ice;
+---- RESULTS: VERIFY_IS_SUBSET
+# For a table that contains no snapshot, the snapshot ids are -1.
+row_regex:'','impala.computeStatsSnapshotIds','1-3:-1\s*'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Create a new snapshot, compute stats, and verify that the snapshot id is the 
same for
+# all columns.
+insert into test_ice values (1, 1, "one"), (1, 2, "two");
+compute stats test_ice;
+describe formatted test_ice;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'','impala.computeStatsSnapshotIds','1-3:(\d+)\s*'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Compute stats for only the first and third columns, verify that they have 
the same
+# snapshot id.
+insert into test_ice values (10, 10, "ten");
+compute stats test_ice(i, s);
+describe formatted test_ice;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'','impala.computeStatsSnapshotIds','1:(\d+),2:(\d+),3:\1\s*'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Compute stats for only the second and third columns, verify that they have 
the same
+# snapshot id.
+insert into test_ice values (100, 100, "hundred");
+compute stats test_ice(b, s);
+describe formatted test_ice;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'','impala.computeStatsSnapshotIds','1:(\d+),2-3:(\d+)\s*'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+drop stats test_ice;
+describe formatted test_ice;
+---- RESULTS: VERIFY_IS_NOT_IN
+row_regex:'','impala.computeStatsSnapshotIds','.*'
+---- TYPES
+STRING,STRING,STRING
+====

Reply via email to