This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new e5919f13f IMPALA-13370: Read Puffin stats from metadata.json property 
if available
e5919f13f is described below

commit e5919f13f93ae6e5cfa9fb01219bddb84c9cc474
Author: Daniel Becker <[email protected]>
AuthorDate: Wed Oct 2 16:56:33 2024 +0200

    IMPALA-13370: Read Puffin stats from metadata.json property if available
    
    When Trino writes Puffin stats for a column, it includes the NDV as a
    property (with key "ndv") in the "statistics" section of the
    metadata.json file, in addition to the Theta sketch in the Puffin file.
    When we are only reading the stats and not writing/updating them, it is
    enough to read this property if it is present.
    
    After this change, Impala only opens and reads a Puffin stats file if it
    contains stats for at least one column for which the "ndv" property is
    not set in the metadata.json file.
    
    Testing:
     - added a test in test_iceberg_with_puffin.py that verifies that the
       Puffin stats file is not read if the the metadata.json file contains
       the NDV property. It uses the newly added stats file with corrupt
       datasketches: 'metadata_ndv_ok_sketches_corrupt.stats'.
    
    Change-Id: I5e92056ce97c4849742db6309562af3b575f647b
    Reviewed-on: http://gerrit.cloudera.org:8080/21959
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/catalog/PuffinStatsLoader.java   | 230 +++++++++++++++++----
 .../puffindatagenerator/PuffinDataGenerator.java   |  94 +++++++--
 .../metadata_ndv_ok_sketches_corrupt.stats         | Bin 0 -> 485 bytes
 ...etadata_ndv_ok_stats_file_corrupt.metadata.json | 194 +++++++++++++++++
 .../generated/multiple_field_ids.metadata.json     | 186 +++++++++++++++++
 .../ice_puffin/generated/multiple_field_ids.stats  | Bin 0 -> 303 bytes
 tests/custom_cluster/test_iceberg_with_puffin.py   |  14 ++
 7 files changed, 671 insertions(+), 47 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/PuffinStatsLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/PuffinStatsLoader.java
index 29ec83672..9f50815f8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/PuffinStatsLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/PuffinStatsLoader.java
@@ -20,8 +20,10 @@ package org.apache.impala.catalog;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
@@ -32,18 +34,22 @@ import org.apache.datasketches.theta.Sketches;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.hadoop.HadoopInputFile;
-import org.apache.iceberg.puffin.BlobMetadata;
 import org.apache.iceberg.puffin.FileMetadata;
 import org.apache.iceberg.puffin.Puffin;
 import org.apache.iceberg.puffin.PuffinReader;
 import org.apache.iceberg.StatisticsFile;
-import org.apache.iceberg.util.Pair;
 
 import org.apache.impala.common.FileSystemUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+// In Iceberg, 'org.apache.iceberg.BlobMetadata' is an interface for metadata 
blobs.
+// 'StatisticsFile' objects obtained from the "metadata.json" file contain 
implementors of
+// this interface. 'org.apache.iceberg.puffin.BlobMetadata' is a class that 
represents
+// blob metadata present in Puffin files. This latter class unfortunately does 
not
+// implement the 'org.apache.iceberg.BlobMetadata' interface but can be 
converted to
+// org.apache.iceberg.GenericBlobMetadata, which does.
 public class PuffinStatsLoader {
   private static final Logger LOG = 
LoggerFactory.getLogger(PuffinStatsLoader.class);
 
@@ -53,10 +59,12 @@ public class PuffinStatsLoader {
 
   public static class PuffinStatsRecord {
     public final StatisticsFile file;
+    public final boolean isFromMetadataJson;
     public final long ndv;
 
-    public PuffinStatsRecord(StatisticsFile file, long ndv) {
+    public PuffinStatsRecord(StatisticsFile file, boolean isFromMetadataJson, 
long ndv) {
       this.file = file;
+      this.isFromMetadataJson = isFromMetadataJson;
       this.ndv = ndv;
     }
   }
@@ -71,46 +79,135 @@ public class PuffinStatsLoader {
     PuffinStatsLoader loader = new PuffinStatsLoader(iceApiTable, tblName);
 
     final List<StatisticsFile> statsFiles = iceApiTable.statisticsFiles();
+
+    for (StatisticsFile statsFile : statsFiles) {
+      loader.loadStatsFromMetadata(statsFile);
+    }
     for (StatisticsFile statsFile : statsFiles) {
       loader.loadStatsFromFile(statsFile);
     }
+
     return loader.result_;
   }
 
+  // Checks the metadata of 'statsFile' and loads NDV values where available.
+  private void loadStatsFromMetadata(StatisticsFile statsFile) {
+    final long currentSnapshotId = iceApiTable_.currentSnapshot().snapshotId();
+    if (statsFile.snapshotId() != currentSnapshotId) return;
+
+    List<org.apache.iceberg.BlobMetadata> metadataBlobs =
+        getBlobsFromMetadataJsonSection(statsFile, currentSnapshotId);
+    for (org.apache.iceberg.BlobMetadata mBlob : metadataBlobs) {
+      Preconditions.checkState(mBlob.fields().size() == 1);
+      int fieldId = mBlob.fields().get(0);
+
+      PuffinStatsRecord existingRecord = result_.get(fieldId);
+      if (existingRecord != null) {
+        logDuplicateStat(fieldId, existingRecord.file.path(), statsFile.path(),
+            existingRecord.ndv);
+      } else {
+        long ndv = getNdvFromMetadata(mBlob);
+        if (ndv != -1) {
+          PuffinStatsRecord record = new PuffinStatsRecord(statsFile, true, 
ndv);
+          addStatsRecordToResult(fieldId, record, null);
+        }
+      }
+    }
+  }
+
+  // Loads NDV values from the Puffin file referenced by 'statsFile' for field 
ids for
+  // which an NDV value has not already been loaded.
   private void loadStatsFromFile(StatisticsFile statsFile) {
     final long currentSnapshotId = iceApiTable_.currentSnapshot().snapshotId();
     if (statsFile.snapshotId() != currentSnapshotId) return;
 
+    List<Integer> fieldIdsToRead = collectFieldIdsToRead(statsFile);
+    if (fieldIdsToRead.isEmpty()) return;
+
     // Keep track of the Iceberg column field ids for which we read statistics 
from this
     // Puffin file. If we run into an error reading the contents of the file, 
the file may
     // be corrupt so we want to remove values already read from it from the 
overall
     // result.
     List<Integer> fieldIdsFromFile = new ArrayList<>();
+
     try {
       PuffinReader puffinReader = createPuffinReader(statsFile);
-      List<BlobMetadata> blobs = getBlobs(puffinReader, currentSnapshotId);
+      List<org.apache.iceberg.puffin.BlobMetadata> blobs = 
getBlobsFromPuffinFile(
+          puffinReader, currentSnapshotId, fieldIdsToRead, statsFile.path());
 
       // The 'UncheckedIOException' can be thrown from the 'next()' method of 
the
       // iterator. Statistics that are loaded successfully before an exception 
is thrown
       // are discarded because the file is probably corrupt.
-      for (Pair<BlobMetadata, ByteBuffer> puffinData: 
puffinReader.readAll(blobs)) {
-        BlobMetadata blobMetadata = puffinData.first();
+      for (org.apache.iceberg.util.Pair<
+              org.apache.iceberg.puffin.BlobMetadata, ByteBuffer> puffinData
+          : puffinReader.readAll(blobs)) {
+        org.apache.iceberg.puffin.BlobMetadata blobMetadata = 
puffinData.first();
         ByteBuffer blobData = puffinData.second();
 
         loadStatsFromBlob(blobMetadata, blobData, statsFile, fieldIdsFromFile);
       }
     } catch (NotFoundException e) {
       // 'result_' has not been touched yet.
-      logWarning(tblName_, statsFile.path(), true, e);
+      logWarningWithFile(tblName_, statsFile.path(), true, e);
     } catch (Exception e) {
       // We restore 'result_' to the previous state because the Puffin file 
may be
       // corrupt.
-      logWarning(tblName_, statsFile.path(), false, e);
+      logWarningWithFile(tblName_, statsFile.path(), false, e);
       result_.keySet().removeAll(fieldIdsFromFile);
     }
   }
 
-  private static void logWarning(String tableName, String statsFilePath,
+  private List<Integer> collectFieldIdsToRead(StatisticsFile statsFile) {
+    final long currentSnapshotId = iceApiTable_.currentSnapshot().snapshotId();
+    List<Integer> res = new ArrayList<>();
+
+    List<org.apache.iceberg.BlobMetadata> metadataBlobs =
+        getBlobsFromMetadataJsonSection(statsFile, currentSnapshotId);
+    for (org.apache.iceberg.BlobMetadata mBlob : metadataBlobs) {
+      Preconditions.checkState(mBlob.fields().size() == 1);
+      int fieldId = mBlob.fields().get(0);
+
+      PuffinStatsRecord existingRecord = result_.get(fieldId);
+      if (existingRecord == null) {
+        res.add(fieldId);
+      } else {
+        // In loadStatsFromMetadata() we only registered NDVs in 'result_' 
where it was
+        // available in the metadata.json file - if it was missing or invalid 
we didn't
+        // register it. It is possible that there are for example two Puffin 
files with
+        // NDVs for the same column, but neither of them (or only one of them) 
has an NDV
+        // value in the metadata.json file. This situation is not discovered in
+        // loadStatsFromMetadata(), so we need to check it here in order to 
log a warning.
+        boolean correspondingMetadataNdv = existingRecord.isFromMetadataJson
+            && existingRecord.file.path().equals(statsFile.path());
+        if (!correspondingMetadataNdv) {
+          logDuplicateStat(fieldId, existingRecord.file.path(), 
statsFile.path(),
+              existingRecord.ndv);
+        }
+      }
+    }
+
+    return res;
+  }
+
+  private long getNdvFromMetadata(org.apache.iceberg.BlobMetadata blob) {
+    String ndvProperty = blob.properties().get("ndv");
+    if (ndvProperty == null) return -1;
+
+    try {
+      return Long.parseLong(ndvProperty);
+    } catch (NumberFormatException e) {
+      int fieldId = blob.fields().get(0);
+      Preconditions.checkNotNull(iceApiTable_.schema().findField(fieldId));
+
+      String colName = fieldIdToColName(fieldId);
+      LOG.warn(String.format(
+            "Invalid NDV property in the statistics metadata for column %s: 
'%s'"),
+            colName, ndvProperty);
+      return -1;
+    }
+  }
+
+  private static void logWarningWithFile(String tableName, String 
statsFilePath,
       boolean fileMissing, Exception e) {
     String missingStr = fileMissing ? "missing " : "";
     LOG.warn(String.format("Could not load Iceberg Puffin column statistics "
@@ -118,6 +215,26 @@ public class PuffinStatsLoader {
         tableName, missingStr, statsFilePath, e));
   }
 
+  private void logDuplicateStat(int fieldId,
+      String existingRecordFilePath, String newRecordFilePath, Long 
existingRecordNdv) {
+    String colName = fieldIdToColName(fieldId);
+
+    String existingNdvStr = existingRecordNdv == null ?
+        "" : String.format(" (%s)", existingRecordNdv);
+
+    if (existingRecordFilePath.equals(newRecordFilePath)) {
+      LOG.warn(String.format("Multiple NDV values from Puffin statistics file 
%s for "
+          + "column '%s' of table '%s'. Only using the first encountered 
one%s, "
+          + "ignoring the rest.",
+          existingRecordFilePath, colName, tblName_, existingNdvStr));
+    } else {
+      LOG.warn(String.format("Multiple NDV values from Puffin statistics for 
column '%s' "
+          + "of table '%s'. Ignoring new value from file %s, using old value%s 
"
+          + "from file %s.", colName, tblName_, newRecordFilePath,
+          existingNdvStr, existingRecordFilePath));
+    }
+  }
+
   private static PuffinReader createPuffinReader(StatisticsFile statsFile) {
     org.apache.iceberg.io.InputFile puffinFile = HadoopInputFile.fromLocation(
         statsFile.path(), FileSystemUtil.getConfiguration());
@@ -128,34 +245,73 @@ public class PuffinStatsLoader {
         .build();
   }
 
-  private static List<BlobMetadata> getBlobs(PuffinReader puffinReader,
-      long currentSnapshotId) throws java.io.IOException {
+  private List<org.apache.iceberg.BlobMetadata> 
getBlobsFromMetadataJsonSection(
+      StatisticsFile statsFile, long currentSnapshotId) {
+    return statsFile.blobMetadata().stream()
+        .filter(blob -> blobFilterPredicate(blob, currentSnapshotId, 
statsFile.path()))
+        .collect(Collectors.toList());
+  }
+
+  private List<org.apache.iceberg.puffin.BlobMetadata> getBlobsFromPuffinFile(
+      PuffinReader puffinReader, long currentSnapshotId, List<Integer> 
fieldIds,
+      String statsFileName) throws java.io.IOException {
     FileMetadata fileMetadata = puffinReader.fileMetadata();
-    return fileMetadata.blobs().stream()
-      .filter(blob ->
-          blob.snapshotId() == currentSnapshotId &&
-          blob.type().equals("apache-datasketches-theta-v1") &&
-          blob.inputFields().size() == 1)
-      .collect(Collectors.toList());
+
+    List<org.apache.iceberg.puffin.BlobMetadata> res = new ArrayList<>();
+    Set<Integer> fieldIdsAdded = new HashSet<>();
+    for (org.apache.iceberg.puffin.BlobMetadata blob : fileMetadata.blobs()) {
+      if 
(!blobFilterPredicate(org.apache.iceberg.GenericBlobMetadata.from(blob),
+          currentSnapshotId, null)) {
+        continue;
+      }
+
+      int fieldId = blob.inputFields().get(0);
+      if (!fieldIds.contains(fieldId)) continue;
+
+      Preconditions.checkState(!result_.containsKey(fieldId));
+
+      if (!fieldIdsAdded.contains(fieldId)) {
+        res.add(blob);
+        fieldIdsAdded.add(fieldId);
+      } else {
+        logDuplicateStat(fieldId, statsFileName, statsFileName, null);
+      }
+    }
+    return res;
   }
 
-  private void loadStatsFromBlob(BlobMetadata blobMetadata, ByteBuffer 
blobData,
-      StatisticsFile statsFile, List<Integer> fieldIdsFromFile) {
-    Preconditions.checkState(blobMetadata.inputFields().size() == 1);
-    int fieldId = blobMetadata.inputFields().get(0);
+  // If 'fileName' is not null, logs a warning if the field id contained in 
the blob is
+  // invalid.
+  private boolean blobFilterPredicate(org.apache.iceberg.BlobMetadata 
blobMetadata,
+      long currentSnapshotId, String fileName) {
+    if (blobMetadata.fields().size() != 1) return false;
+    if (blobMetadata.sourceSnapshotId() != currentSnapshotId) return false;
+
+    int fieldId = blobMetadata.fields().get(0);
     if (iceApiTable_.schema().findField(fieldId) == null) {
-      LOG.warn(String.format("Invalid field id %s for table '%s' found "
+      if (fileName != null) {
+        LOG.warn(String.format("Invalid field id %s for table '%s' found "
             + "in Puffin stats file '%s'. Ignoring blob.",
-            fieldId, tblName_, statsFile.path()));
-      return;
+            fieldId, tblName_, fileName));
+      }
+      return false;
     }
 
+    return blobMetadata.type().equals("apache-datasketches-theta-v1");
+  }
+
+  private void loadStatsFromBlob(org.apache.iceberg.puffin.BlobMetadata 
blobMetadata,
+      ByteBuffer blobData, StatisticsFile statsFile, List<Integer> 
fieldIdsFromFile) {
+    Preconditions.checkState(blobMetadata.inputFields().size() == 1);
+    int fieldId = blobMetadata.inputFields().get(0);
+    Preconditions.checkNotNull(iceApiTable_.schema().findField(fieldId));
+
     double ndv = -1;
     try {
       // Memory.wrap(ByteBuffer) would result in an incorrect deserialisation.
       ndv = Sketches.getEstimate(Memory.wrap(getBytes(blobData)));
     } catch (SketchesArgumentException e) {
-      String colName = iceApiTable_.schema().idToName().get(fieldId);
+      String colName = fieldIdToColName(fieldId);
       LOG.warn(String.format("Error reading datasketch for column '%s' of 
table '%s' "
           + "from Puffin stats file %s: %s", colName, tblName_, 
statsFile.path(), e));
       return;
@@ -163,19 +319,23 @@ public class PuffinStatsLoader {
     Preconditions.checkState(ndv != -1);
 
     long ndvRounded = Math.round(ndv);
-    PuffinStatsRecord record = new PuffinStatsRecord(statsFile, ndvRounded);
+    PuffinStatsRecord record = new PuffinStatsRecord(statsFile, false, 
ndvRounded);
+
+    addStatsRecordToResult(fieldId, record, fieldIdsFromFile);
+  }
 
+  private void addStatsRecordToResult(int fieldId, PuffinStatsRecord record,
+      List<Integer> fieldIdsFromFile) {
     PuffinStatsRecord prevRecord = result_.putIfAbsent(fieldId, record);
+    // Duplicate stats are detected earlier.
+    Preconditions.checkState(prevRecord == null);
+    if (!record.isFromMetadataJson) fieldIdsFromFile.add(fieldId);
+  }
 
-    if (prevRecord == null) {
-      fieldIdsFromFile.add(fieldId);
-    } else {
-      String colName = iceApiTable_.schema().idToName().get(fieldId);
-      LOG.warn(String.format("Multiple NDV values from Puffin statistics for 
column '%s' "
-          + "of table '%s'. Old value (from file %s): %s; new value (from file 
%s): %s. "
-          + "Using the old value.", colName, tblName_, prevRecord.file.path(),
-          prevRecord.ndv, record.file.path(), record.ndv));
-    }
+  private String fieldIdToColName(int fieldId) {
+    String colName = iceApiTable_.schema().idToName().get(fieldId);
+    Preconditions.checkNotNull(colName);
+    return colName;
   }
 
   // Gets the bytes from the provided 'ByteBuffer' without advancing buffer 
position. The
diff --git 
a/java/puffin-data-generator/src/main/java/org/apache/impala/puffindatagenerator/PuffinDataGenerator.java
 
b/java/puffin-data-generator/src/main/java/org/apache/impala/puffindatagenerator/PuffinDataGenerator.java
index c599262a7..ebd4910f0 100644
--- 
a/java/puffin-data-generator/src/main/java/org/apache/impala/puffindatagenerator/PuffinDataGenerator.java
+++ 
b/java/puffin-data-generator/src/main/java/org/apache/impala/puffindatagenerator/PuffinDataGenerator.java
@@ -27,11 +27,12 @@ import java.io.InputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
-import java.io.UnsupportedEncodingException;
 import java.io.Writer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.Scanner;
@@ -146,14 +147,18 @@ public class PuffinDataGenerator {
     generator.writeFileContainsInvalidFieldId();
     generator.writeStatForUnsupportedType();
     generator.writeFileWithInvalidAndCorruptSketches();
+    generator.writeFileMetadataNdvOkFileCorrupt();
+    generator.writeFileMultipleFieldIds();
   }
 
   public PuffinDataGenerator(String metadataJsonTemplatePath, String 
localOutputDir)
       throws java.io.FileNotFoundException, JsonProcessingException {
     localOutputDir_ = localOutputDir;
 
-    String metadataJsonStr = new Scanner(new File(metadataJsonTemplatePath))
-        .useDelimiter("\\Z").next();
+    String metadataJsonStr;
+    try (Scanner scanner = new Scanner(new File(metadataJsonTemplatePath))) {
+      metadataJsonStr = scanner.useDelimiter("\\Z").next();
+    }
 
     snapshotId_ = getSnapshotIdFromMetadataJson(metadataJsonStr);
 
@@ -398,13 +403,7 @@ public class PuffinDataGenerator {
         SEQUENCE_NUMBER, sketches.get(1)));
 
     blobs.add(createBlob(snapshotId_, SEQUENCE_NUMBER, 3, 3));
-
-    // Corrupt sketch.
-    byte[] bytes = {0, 0};
-    ByteBuffer corruptSketch = ByteBuffer.wrap(bytes);
-    blobs.add(new Blob(SKETCH_TYPE, Arrays.asList(4), snapshotId_, 
SEQUENCE_NUMBER,
-        corruptSketch));
-
+    blobs.add(createBlobCorruptSketch(snapshotId_, SEQUENCE_NUMBER, 4, 4, 
false));
     blobs.add(createBlob(snapshotId_, SEQUENCE_NUMBER, 5, 5));
 
     FileData fileData = new FileData(
@@ -414,6 +413,37 @@ public class PuffinDataGenerator {
     writeFilesForScenario(puffinFiles, 
"invalidAndCorruptSketches.metadata.json");
   }
 
+  private void writeFileMetadataNdvOkFileCorrupt() throws IOException {
+    // The sketches in the Puffin file are corrupt but it shouldn't cause an 
error since
+    // we don't actually read it because we read the NDV value from the 
metadata.json
+    // file.
+    List<Blob> blobs = new ArrayList<>();
+    blobs.add(createBlobCorruptSketch(snapshotId_, SEQUENCE_NUMBER, 1, 1, 
true));
+    blobs.add(createBlobCorruptSketch(snapshotId_, SEQUENCE_NUMBER, 2, 2, 
true));
+
+    FileData corruptFile = new FileData(
+        "metadata_ndv_ok_sketches_corrupt.stats", snapshotId_, blobs, true);
+
+    List<FileData> puffinFiles = new ArrayList<>();
+    puffinFiles.add(corruptFile);
+    writeFilesForScenario(puffinFiles,
+        "metadata_ndv_ok_stats_file_corrupt.metadata.json");
+  }
+
+  private void writeFileMultipleFieldIds() throws IOException {
+    List<Blob> blobs = new ArrayList<>();
+    List<Integer> fieldIds = Arrays.asList(1, 2);
+    blobs.add(createBlobMultipleFieldIds(snapshotId_, SEQUENCE_NUMBER, 
fieldIds, 1,
+        true));
+
+    FileData file = new FileData(
+        "multiple_field_ids.stats", snapshotId_, blobs, true);
+
+    List<FileData> puffinFiles = new ArrayList<>();
+    puffinFiles.add(file);
+    writeFilesForScenario(puffinFiles, "multiple_field_ids.metadata.json");
+  }
+
   private static ByteBuffer createSketchWithNdv(int ndv) {
     UpdateSketch sketch = UpdateSketch.builder().build();
     for (int i = 0; i < ndv; i++) sketch.update(i);
@@ -430,8 +460,39 @@ public class PuffinDataGenerator {
 
   private static Blob createBlob(long snapshotId, long sequenceNumber,
       int fieldId, int ndv) {
-    return new Blob(SKETCH_TYPE, Arrays.asList(fieldId), snapshotId, 
sequenceNumber,
-        sketches.get(ndv-1));
+    return createBlob(snapshotId, sequenceNumber, fieldId, ndv, false);
+  }
+
+  private static Blob createBlob(long snapshotId, long sequenceNumber,
+      int fieldId, int ndv, boolean addNdvProperty) {
+    return createBlobMultipleFieldIds(snapshotId, sequenceNumber, 
Arrays.asList(fieldId),
+        ndv, addNdvProperty);
+  }
+
+  private static Blob createBlobCorruptSketch(long snapshotId, long 
sequenceNumber,
+      int fieldId, int ndv, boolean addNdvProperty) {
+    // Corrupt sketch.
+    byte[] bytes = {0, 0};
+    ByteBuffer corruptSketch = ByteBuffer.wrap(bytes);
+
+    return createBlobWithProperties(snapshotId, sequenceNumber, 
Arrays.asList(fieldId),
+        ndv, corruptSketch, addNdvProperty);
+  }
+
+  private static Blob createBlobMultipleFieldIds(long snapshotId, long 
sequenceNumber,
+      List<Integer> fieldIds, int ndv, boolean addNdvProperty) {
+    return createBlobWithProperties(snapshotId, sequenceNumber, fieldIds, ndv,
+        sketches.get(ndv-1), addNdvProperty);
+  }
+
+  private static Blob createBlobWithProperties(long snapshotId, long 
sequenceNumber,
+      List<Integer> fieldIds, int ndv, ByteBuffer datasketch, boolean 
addNdvProperty) {
+    Map<String, String> properties = new HashMap<>();
+    if (addNdvProperty) {
+      properties.put("ndv", Integer.toString(ndv));
+    }
+    return new Blob(SKETCH_TYPE, fieldIds, snapshotId, sequenceNumber,
+        datasketch, null, properties);
   }
 
   private void writeFilesForScenario(List<FileData> puffinFiles, String 
statsJsonFile)
@@ -498,6 +559,15 @@ public class PuffinDataGenerator {
     for (int fieldId : blob.inputFields()) fieldsList.add(fieldId);
     blobNode.set("fields", fieldsList);
 
+    // Put properties
+    if (!blob.properties().isEmpty()) {
+      ObjectNode properties = mapper_.createObjectNode();
+      for (Map.Entry<String, String> entry : blob.properties().entrySet()) {
+        properties.put(entry.getKey(), entry.getValue());
+      }
+      blobNode.set("properties", properties);
+    }
+
     return blobNode;
   }
 
diff --git 
a/testdata/ice_puffin/generated/metadata_ndv_ok_sketches_corrupt.stats 
b/testdata/ice_puffin/generated/metadata_ndv_ok_sketches_corrupt.stats
new file mode 100644
index 000000000..2dc796a63
Binary files /dev/null and 
b/testdata/ice_puffin/generated/metadata_ndv_ok_sketches_corrupt.stats differ
diff --git 
a/testdata/ice_puffin/generated/metadata_ndv_ok_stats_file_corrupt.metadata.json
 
b/testdata/ice_puffin/generated/metadata_ndv_ok_stats_file_corrupt.metadata.json
new file mode 100644
index 000000000..0452002f8
--- /dev/null
+++ 
b/testdata/ice_puffin/generated/metadata_ndv_ok_stats_file_corrupt.metadata.json
@@ -0,0 +1,194 @@
+{
+  "format-version" : 1,
+  "table-uuid" : "UUID_PLACEHOLDER",
+  "location" : "TABLE_LOCATION_PLACEHOLDER",
+  "last-updated-ms" : 1728483427091,
+  "last-column-id" : 10,
+  "schema" : {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "int_col1",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "int_col2",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 3,
+      "name" : "bigint_col",
+      "required" : false,
+      "type" : "long"
+    }, {
+      "id" : 4,
+      "name" : "float_col",
+      "required" : false,
+      "type" : "float"
+    }, {
+      "id" : 5,
+      "name" : "double_col",
+      "required" : false,
+      "type" : "double"
+    }, {
+      "id" : 6,
+      "name" : "decimal_col",
+      "required" : false,
+      "type" : "decimal(9, 0)"
+    }, {
+      "id" : 7,
+      "name" : "date_col",
+      "required" : false,
+      "type" : "date"
+    }, {
+      "id" : 8,
+      "name" : "string_col",
+      "required" : false,
+      "type" : "string"
+    }, {
+      "id" : 9,
+      "name" : "timestamp_col",
+      "required" : false,
+      "type" : "timestamp"
+    }, {
+      "id" : 10,
+      "name" : "bool_col",
+      "required" : false,
+      "type" : "boolean"
+    } ]
+  },
+  "current-schema-id" : 0,
+  "schemas" : [ {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "int_col1",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "int_col2",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 3,
+      "name" : "bigint_col",
+      "required" : false,
+      "type" : "long"
+    }, {
+      "id" : 4,
+      "name" : "float_col",
+      "required" : false,
+      "type" : "float"
+    }, {
+      "id" : 5,
+      "name" : "double_col",
+      "required" : false,
+      "type" : "double"
+    }, {
+      "id" : 6,
+      "name" : "decimal_col",
+      "required" : false,
+      "type" : "decimal(9, 0)"
+    }, {
+      "id" : 7,
+      "name" : "date_col",
+      "required" : false,
+      "type" : "date"
+    }, {
+      "id" : 8,
+      "name" : "string_col",
+      "required" : false,
+      "type" : "string"
+    }, {
+      "id" : 9,
+      "name" : "timestamp_col",
+      "required" : false,
+      "type" : "timestamp"
+    }, {
+      "id" : 10,
+      "name" : "bool_col",
+      "required" : false,
+      "type" : "boolean"
+    } ]
+  } ],
+  "partition-spec" : [ ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ ]
+  } ],
+  "last-partition-id" : 999,
+  "default-sort-order-id" : 0,
+  "sort-orders" : [ {
+    "order-id" : 0,
+    "fields" : [ ]
+  } ],
+  "properties" : {
+    "engine.hive.enabled" : "true",
+    "impala.events.catalogServiceId" : "95d77e9a4a3d4e27:96a993b690a3fba6",
+    "external.table.purge" : "TRUE",
+    "impala.events.catalogVersion" : "2108",
+    "write.format.default" : "parquet",
+    "hive.metastore.table.owner" : "danielbecker",
+    "OBJCAPABILITIES" : "EXTREAD,EXTWRITE",
+    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"
+  },
+  "current-snapshot-id" : 2532372403033748214,
+  "refs" : {
+    "main" : {
+      "snapshot-id" : 2532372403033748214,
+      "type" : "branch"
+    }
+  },
+  "snapshots" : [ {
+    "snapshot-id" : 2532372403033748214,
+    "timestamp-ms" : 1728483427081,
+    "summary" : {
+      "operation" : "delete",
+      "changed-partition-count" : "0",
+      "total-records" : "0",
+      "total-files-size" : "0",
+      "total-data-files" : "0",
+      "total-delete-files" : "0",
+      "total-position-deletes" : "0",
+      "total-equality-deletes" : "0"
+    },
+    "manifest-list" : 
"TABLE_LOCATION_PLACEHOLDER/metadata/snap-2532372403033748214-1-c9f94c00-2920-4a39-8e7f-c7faf7e71a7d.avro",
+    "schema-id" : 0
+  } ],
+  "statistics" : [ {
+    "snapshot-id" : 2532372403033748214,
+    "statistics-path" : 
"TABLE_LOCATION_PLACEHOLDER/metadata/metadata_ndv_ok_sketches_corrupt.stats",
+    "file-size-in-bytes" : 485,
+    "file-footer-size-in-bytes" : 451,
+    "blob-metadata" : [ {
+      "type" : "apache-datasketches-theta-v1",
+      "snapshot-id" : 2532372403033748214,
+      "sequence-number" : 0,
+      "fields" : [ 1 ],
+      "properties" : {
+        "ndv" : "1"
+      }
+    }, {
+      "type" : "apache-datasketches-theta-v1",
+      "snapshot-id" : 2532372403033748214,
+      "sequence-number" : 0,
+      "fields" : [ 2 ],
+      "properties" : {
+        "ndv" : "2"
+      }
+    } ]
+  } ],
+  "snapshot-log" : [ {
+    "timestamp-ms" : 1728483427081,
+    "snapshot-id" : 2532372403033748214
+  } ],
+  "metadata-log" : [ {
+    "timestamp-ms" : 1728483321332,
+    "metadata-file" : 
"TABLE_LOCATION_PLACEHOLDER/metadata/00000-0991de67-dc39-4226-af9c-8f347dbd57e5.metadata.json"
+  } ]
+}
\ No newline at end of file
diff --git a/testdata/ice_puffin/generated/multiple_field_ids.metadata.json 
b/testdata/ice_puffin/generated/multiple_field_ids.metadata.json
new file mode 100644
index 000000000..3dd8e0a0e
--- /dev/null
+++ b/testdata/ice_puffin/generated/multiple_field_ids.metadata.json
@@ -0,0 +1,186 @@
+{
+  "format-version" : 1,
+  "table-uuid" : "UUID_PLACEHOLDER",
+  "location" : "TABLE_LOCATION_PLACEHOLDER",
+  "last-updated-ms" : 1728483427091,
+  "last-column-id" : 10,
+  "schema" : {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "int_col1",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "int_col2",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 3,
+      "name" : "bigint_col",
+      "required" : false,
+      "type" : "long"
+    }, {
+      "id" : 4,
+      "name" : "float_col",
+      "required" : false,
+      "type" : "float"
+    }, {
+      "id" : 5,
+      "name" : "double_col",
+      "required" : false,
+      "type" : "double"
+    }, {
+      "id" : 6,
+      "name" : "decimal_col",
+      "required" : false,
+      "type" : "decimal(9, 0)"
+    }, {
+      "id" : 7,
+      "name" : "date_col",
+      "required" : false,
+      "type" : "date"
+    }, {
+      "id" : 8,
+      "name" : "string_col",
+      "required" : false,
+      "type" : "string"
+    }, {
+      "id" : 9,
+      "name" : "timestamp_col",
+      "required" : false,
+      "type" : "timestamp"
+    }, {
+      "id" : 10,
+      "name" : "bool_col",
+      "required" : false,
+      "type" : "boolean"
+    } ]
+  },
+  "current-schema-id" : 0,
+  "schemas" : [ {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "int_col1",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "int_col2",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 3,
+      "name" : "bigint_col",
+      "required" : false,
+      "type" : "long"
+    }, {
+      "id" : 4,
+      "name" : "float_col",
+      "required" : false,
+      "type" : "float"
+    }, {
+      "id" : 5,
+      "name" : "double_col",
+      "required" : false,
+      "type" : "double"
+    }, {
+      "id" : 6,
+      "name" : "decimal_col",
+      "required" : false,
+      "type" : "decimal(9, 0)"
+    }, {
+      "id" : 7,
+      "name" : "date_col",
+      "required" : false,
+      "type" : "date"
+    }, {
+      "id" : 8,
+      "name" : "string_col",
+      "required" : false,
+      "type" : "string"
+    }, {
+      "id" : 9,
+      "name" : "timestamp_col",
+      "required" : false,
+      "type" : "timestamp"
+    }, {
+      "id" : 10,
+      "name" : "bool_col",
+      "required" : false,
+      "type" : "boolean"
+    } ]
+  } ],
+  "partition-spec" : [ ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ ]
+  } ],
+  "last-partition-id" : 999,
+  "default-sort-order-id" : 0,
+  "sort-orders" : [ {
+    "order-id" : 0,
+    "fields" : [ ]
+  } ],
+  "properties" : {
+    "engine.hive.enabled" : "true",
+    "impala.events.catalogServiceId" : "95d77e9a4a3d4e27:96a993b690a3fba6",
+    "external.table.purge" : "TRUE",
+    "impala.events.catalogVersion" : "2108",
+    "write.format.default" : "parquet",
+    "hive.metastore.table.owner" : "danielbecker",
+    "OBJCAPABILITIES" : "EXTREAD,EXTWRITE",
+    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"
+  },
+  "current-snapshot-id" : 2532372403033748214,
+  "refs" : {
+    "main" : {
+      "snapshot-id" : 2532372403033748214,
+      "type" : "branch"
+    }
+  },
+  "snapshots" : [ {
+    "snapshot-id" : 2532372403033748214,
+    "timestamp-ms" : 1728483427081,
+    "summary" : {
+      "operation" : "delete",
+      "changed-partition-count" : "0",
+      "total-records" : "0",
+      "total-files-size" : "0",
+      "total-data-files" : "0",
+      "total-delete-files" : "0",
+      "total-position-deletes" : "0",
+      "total-equality-deletes" : "0"
+    },
+    "manifest-list" : 
"TABLE_LOCATION_PLACEHOLDER/metadata/snap-2532372403033748214-1-c9f94c00-2920-4a39-8e7f-c7faf7e71a7d.avro",
+    "schema-id" : 0
+  } ],
+  "statistics" : [ {
+    "snapshot-id" : 2532372403033748214,
+    "statistics-path" : 
"TABLE_LOCATION_PLACEHOLDER/metadata/multiple_field_ids.stats",
+    "file-size-in-bytes" : 303,
+    "file-footer-size-in-bytes" : 270,
+    "blob-metadata" : [ {
+      "type" : "apache-datasketches-theta-v1",
+      "snapshot-id" : 2532372403033748214,
+      "sequence-number" : 0,
+      "fields" : [ 1, 2 ],
+      "properties" : {
+        "ndv" : "1"
+      }
+    } ]
+  } ],
+  "snapshot-log" : [ {
+    "timestamp-ms" : 1728483427081,
+    "snapshot-id" : 2532372403033748214
+  } ],
+  "metadata-log" : [ {
+    "timestamp-ms" : 1728483321332,
+    "metadata-file" : 
"TABLE_LOCATION_PLACEHOLDER/metadata/00000-0991de67-dc39-4226-af9c-8f347dbd57e5.metadata.json"
+  } ]
+}
\ No newline at end of file
diff --git a/testdata/ice_puffin/generated/multiple_field_ids.stats 
b/testdata/ice_puffin/generated/multiple_field_ids.stats
new file mode 100644
index 000000000..334d74195
Binary files /dev/null and 
b/testdata/ice_puffin/generated/multiple_field_ids.stats differ
diff --git a/tests/custom_cluster/test_iceberg_with_puffin.py 
b/tests/custom_cluster/test_iceberg_with_puffin.py
index f0e12123e..e14f4cd35 100644
--- a/tests/custom_cluster/test_iceberg_with_puffin.py
+++ b/tests/custom_cluster/test_iceberg_with_puffin.py
@@ -121,6 +121,8 @@ class 
TestIcebergTableWithPuffinStats(CustomClusterTestSuite):
     self._check_file_contains_invalid_field_id(tbl_name, tbl_loc, 
metadata_json_path)
     self._check_stats_for_unsupported_type(tbl_name, tbl_loc, 
metadata_json_path)
     self._check_invalid_and_corrupt_sketches(tbl_name, tbl_loc, 
metadata_json_path)
+    self._check_metadata_ndv_ok_file_corrupt(tbl_name, tbl_loc, 
metadata_json_path)
+    self._check_multiple_field_ids_for_blob(tbl_name, tbl_loc, 
metadata_json_path)
 
     # Disable reading Puffin stats with a table property.
     disable_puffin_reading_tbl_prop_stmt = "alter table {} set tblproperties( \
@@ -231,6 +233,18 @@ class 
TestIcebergTableWithPuffinStats(CustomClusterTestSuite):
         "invalidAndCorruptSketches.metadata.json",
         [1, -1, 3, -1, 5, -1, -1, -1, 2000, -1])
 
+  def _check_metadata_ndv_ok_file_corrupt(self, tbl_name, tbl_loc, 
metadata_json_path):
+    # The Puffin file is corrupt but it shouldn't cause an error since we 
don't actually
+    # read it because we read the NDV value from the metadata.json file.
+    self._check_scenario(tbl_name, tbl_loc, metadata_json_path,
+        "metadata_ndv_ok_stats_file_corrupt.metadata.json",
+        [1, 2, -1, -1, -1, -1, -1, -1, 2000, -1])
+
+  def _check_multiple_field_ids_for_blob(self, tbl_name, tbl_loc, 
metadata_json_path):
+    self._check_scenario(tbl_name, tbl_loc, metadata_json_path,
+        "multiple_field_ids.metadata.json",
+        [-1, -1, -1, -1, -1, -1, -1, -1, 2000, -1])
+
   def _check_reading_puffin_stats_disabled_by_tbl_prop(self, tbl_name, tbl_loc,
           metadata_json_path):
     self._check_scenario(tbl_name, tbl_loc, metadata_json_path,


Reply via email to