This is an automated email from the ASF dual-hosted git repository.

wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cad08b1f2cfd feat(lance): Support bloom filter in Lance writer and 
reader (#18304)
cad08b1f2cfd is described below

commit cad08b1f2cfd637db050d37dee8fd9efddd5fc6a
Author: Vova Kolmakov <[email protected]>
AuthorDate: Thu Mar 19 09:00:51 2026 +0700

    feat(lance): Support bloom filter in Lance writer and reader (#18304)
---
 .../io/storage/HoodieSparkFileWriterFactory.java   |  4 +-
 .../hudi/io/storage/HoodieSparkLanceReader.java    | 68 ++++++++++++++++++----
 .../hudi/io/storage/HoodieSparkLanceWriter.java    | 31 +++++-----
 .../row/HoodieBloomFilterRowWriteSupport.java      | 58 ++++++++++++++++++
 .../storage/row/HoodieRowParquetWriteSupport.java  | 29 ---------
 .../hudi/io/lance/HoodieBaseLanceWriter.java       | 20 ++++++-
 .../io/storage/TestHoodieSparkLanceReader.java     | 36 +++++++++++-
 .../io/storage/TestHoodieSparkLanceWriter.java     | 22 ++++---
 8 files changed, 201 insertions(+), 67 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index bd4ba955d04a..9ebb94a0bb54 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -109,8 +109,10 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
                                                 TaskContextSupplier 
taskContextSupplier) throws IOException {
     boolean populateMetaFields = 
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
     StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
+    boolean enableBloomFilter = enableBloomFilter(populateMetaFields, config);
+    Option<BloomFilter> bloomFilter = enableBloomFilter ? 
Option.of(createBloomFilter(config)) : Option.empty();
 
-    return new HoodieSparkLanceWriter(path, structType, instantTime, 
taskContextSupplier, storage, populateMetaFields);
+    return new HoodieSparkLanceWriter(path, structType, instantTime, 
taskContextSupplier, storage, populateMetaFields, bloomFilter);
   }
 
   private static HoodieRowParquetWriteSupport 
getHoodieRowParquetWriteSupport(StorageConfiguration<?> conf, HoodieSchema 
schema,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
index f99bf2ce9eac..3bf6625a4fd5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
@@ -20,6 +20,8 @@ package org.apache.hudi.io.storage;
 
 import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.bloom.SimpleBloomFilter;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
@@ -32,6 +34,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.memory.HoodieArrowAllocator;
 import org.apache.hudi.storage.StoragePath;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.types.pojo.Schema;
@@ -46,13 +49,19 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static 
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static 
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 
 /**
  * {@link HoodieSparkFileReader} implementation for Lance file format.
  */
+@Slf4j
 public class HoodieSparkLanceReader implements HoodieSparkFileReader {
   // Memory size for data read operations: 120MB
   public static final long LANCE_DATA_ALLOCATOR_SIZE = 120 * 1024 * 1024;
@@ -63,19 +72,52 @@ public class HoodieSparkLanceReader implements 
HoodieSparkFileReader {
   // number of rows to read
   private static final int DEFAULT_BATCH_SIZE = 512;
   private final StoragePath path;
+  private final BufferAllocator metadataAllocator;
+  private final LanceFileReader lanceMetadataReader;
+  private final Schema arrowSchema;
 
   public HoodieSparkLanceReader(StoragePath path) {
     this.path = path;
+    metadataAllocator = HoodieArrowAllocator.newChildAllocator(
+        getClass().getSimpleName() + "-metadata-" + path.getName(), 
LANCE_METADATA_ALLOCATOR_SIZE);
+    try {
+      lanceMetadataReader = LanceFileReader.open(path.toString(), 
metadataAllocator);
+      arrowSchema = lanceMetadataReader.schema();
+    } catch (Exception e) {
+      close();
+      throw new HoodieException("Failed to create lanceMetadataReader: " + 
path, e);
+    }
   }
 
   @Override
   public String[] readMinMaxRecordKeys() {
-    throw new UnsupportedOperationException("Min/max record key tracking is 
not yet supported for Lance file format");
+    Map<String, String> metadata = arrowSchema.getCustomMetadata();
+    if (metadata != null && !metadata.isEmpty()) {
+      String minKey = metadata.get(HOODIE_MIN_RECORD_KEY_FOOTER);
+      String maxKey = metadata.get(HOODIE_MAX_RECORD_KEY_FOOTER);
+      if (minKey != null && maxKey != null) {
+        return new String[] { minKey, maxKey };
+      }
+    }
+    throw new HoodieException("Could not read min/max record key out of Lance 
file: " + path);
   }
 
   @Override
   public BloomFilter readBloomFilter() {
-    throw new UnsupportedOperationException("Bloom filter is not yet supported 
for Lance file format");
+    BloomFilter toReturn = null;
+    Map<String, String> metadata = arrowSchema.getCustomMetadata();
+    if (metadata != null && !metadata.isEmpty()) {
+      if (metadata.containsKey(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)) {
+        String bloomSer = metadata.get(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
+        String filterType = metadata.get(HOODIE_BLOOM_FILTER_TYPE_CODE);
+        if (filterType != null && 
filterType.contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
+          toReturn = new HoodieDynamicBoundedBloomFilter(bloomSer);
+        } else {
+          toReturn = new SimpleBloomFilter(bloomSer);
+        }
+      }
+    }
+    return toReturn;
   }
 
   @Override
@@ -159,10 +201,7 @@ public class HoodieSparkLanceReader implements 
HoodieSparkFileReader {
 
   @Override
   public HoodieSchema getSchema() {
-    try (BufferAllocator allocator = HoodieArrowAllocator.newChildAllocator(
-             getClass().getSimpleName() + "-metadata-" + path.getName(), 
LANCE_METADATA_ALLOCATOR_SIZE);
-         LanceFileReader reader = LanceFileReader.open(path.toString(), 
allocator)) {
-      Schema arrowSchema = reader.schema();
+    try {
       StructType structType = LanceArrowUtils.fromArrowSchema(arrowSchema);
       return 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, 
"record", "", false);
     } catch (Exception e) {
@@ -172,15 +211,22 @@ public class HoodieSparkLanceReader implements 
HoodieSparkFileReader {
 
   @Override
   public void close() {
-    // noop as resources are managed by the LanceRecordIterator and not within 
this reader class.
+    if (lanceMetadataReader != null) {
+      try {
+        lanceMetadataReader.close();
+      } catch (Exception e) {
+        log.warn("Error while closing metadataLanceReader: {}", 
e.getMessage());
+      }
+    }
+    if (metadataAllocator != null) {
+      metadataAllocator.close();
+    }
   }
 
   @Override
   public long getTotalRecords() {
-    try (BufferAllocator allocator = HoodieArrowAllocator.newChildAllocator(
-             getClass().getSimpleName() + "-metadata-" + path.getName(), 
LANCE_METADATA_ALLOCATOR_SIZE);
-         LanceFileReader reader = LanceFileReader.open(path.toString(), 
allocator)) {
-      return reader.numRows();
+    try {
+      return lanceMetadataReader.numRows();
     } catch (Exception e) {
       throw new HoodieException("Failed to get row count from Lance file: " + 
path, e);
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
index 08aae1979d8e..e029fcbab010 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
@@ -18,10 +18,13 @@
 
 package org.apache.hudi.io.storage;
 
+import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
+import org.apache.hudi.io.storage.row.HoodieBloomFilterRowWriteSupport;
 import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
@@ -49,10 +52,11 @@ import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECO
  *
  * This writer integrates with Hudi's storage I/O layer and supports:
  * - Hudi metadata field population
- * - Record key tracking (for bloom filters - TODO 
https://github.com/apache/hudi/issues/17664)
+ * - Record key tracking (for bloom filters)
  * - Sequence ID generation
+ * - Min/max record key tracking
  */
-public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow>
+public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow, 
UTF8String>
     implements HoodieSparkFileWriter, HoodieInternalRowFileWriter {
 
   private static final String DEFAULT_TIMEZONE = "UTC";
@@ -73,15 +77,16 @@ public class HoodieSparkLanceWriter extends 
HoodieBaseLanceWriter<InternalRow>
    * @param taskContextSupplier Task context supplier for partition ID
    * @param storage HoodieStorage instance
    * @param populateMetaFields Whether to populate Hudi metadata fields
-   * @throws IOException if writer initialization fails
+   * @param bloomFilterOpt Optional bloom filter for record key tracking
    */
   public HoodieSparkLanceWriter(StoragePath file,
                                 StructType sparkSchema,
                                 String instantTime,
                                 TaskContextSupplier taskContextSupplier,
                                 HoodieStorage storage,
-                                boolean populateMetaFields) throws IOException 
{
-    super(file, DEFAULT_BATCH_SIZE);
+                                boolean populateMetaFields,
+                                Option<BloomFilter> bloomFilterOpt) {
+    super(file, DEFAULT_BATCH_SIZE, 
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new));
     this.sparkSchema = sparkSchema;
     this.arrowSchema = LanceArrowUtils.toArrowSchema(sparkSchema, 
DEFAULT_TIMEZONE, true, false);
     this.fileName = UTF8String.fromString(file.getName());
@@ -100,34 +105,34 @@ public class HoodieSparkLanceWriter extends 
HoodieBaseLanceWriter<InternalRow>
    * @param sparkSchema Spark schema for the data
    * @param taskContextSupplier Task context supplier for partition ID
    * @param storage HoodieStorage instance
-   * @throws IOException if writer initialization fails
    */
   public HoodieSparkLanceWriter(StoragePath file,
                                 StructType sparkSchema,
                                 TaskContextSupplier taskContextSupplier,
-                                HoodieStorage storage) throws IOException {
-    this(file, sparkSchema, null, taskContextSupplier, storage, false);
+                                HoodieStorage storage) {
+    this(file, sparkSchema, null, taskContextSupplier, storage, false, 
Option.empty());
   }
 
   @Override
   public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws 
IOException {
+    UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
+    bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport -> 
bloomFilterWriteSupport.addKey(recordKey));
     if (populateMetaFields) {
-      UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
       updateRecordMetadata(row, recordKey, key.getPartitionPath(), 
getWrittenRecordCount());
-      super.write(row);
-    } else {
-      super.write(row);
     }
+    super.write(row);
   }
 
   @Override
   public void writeRow(String recordKey, InternalRow row) throws IOException {
+    bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
+        bloomFilterWriteSupport.addKey(UTF8String.fromString(recordKey)));
     super.write(row);
   }
   
   @Override
   public void writeRow(UTF8String key, InternalRow row) throws IOException {
-    // Key reserved for future bloom filter support 
(https://github.com/apache/hudi/issues/17664)
+    bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport -> 
bloomFilterWriteSupport.addKey(key));
     super.write(row);
   }
   
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterRowWriteSupport.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterRowWriteSupport.java
new file mode 100644
index 000000000000..4a4519a90041
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterRowWriteSupport.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.SparkAdapterSupport$;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
+import org.apache.hudi.common.bloom.BloomFilter;
+
+import org.apache.spark.sql.HoodieUTF8StringFactory;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Bloom filter write support implementation for Lance file format.
+ * Handles UTF8String record keys specific to Spark InternalRow.
+ */
+public class HoodieBloomFilterRowWriteSupport extends 
HoodieBloomFilterWriteSupport<UTF8String> {
+
+  private static final HoodieUTF8StringFactory UTF8STRING_FACTORY =
+      SparkAdapterSupport$.MODULE$.sparkAdapter().getUTF8StringFactory();
+
+  public HoodieBloomFilterRowWriteSupport(BloomFilter bloomFilter) {
+    super(bloomFilter);
+  }
+
+  @Override
+  protected int compareRecordKey(UTF8String a, UTF8String b) {
+    return 
UTF8STRING_FACTORY.wrapUTF8String(a).compareTo(UTF8STRING_FACTORY.wrapUTF8String(b));
+  }
+
+  @Override
+  protected byte[] getUTF8Bytes(UTF8String key) {
+    return key.getBytes();
+  }
+
+  @Override
+  protected UTF8String dereference(UTF8String key) {
+    // NOTE: [[clone]] is performed here (rather than [[copy]]) to only copy 
underlying buffer in
+    //       cases when [[UTF8String]] is pointing into a buffer storing the 
whole containing record,
+    //       and simply do a pass over when it holds a (immutable) buffer 
holding just the string
+    return key.clone();
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index 9d0da7f534af..be2ed64ece72 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -45,7 +45,6 @@ import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Types;
-import org.apache.spark.sql.HoodieUTF8StringFactory;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
 import org.apache.spark.sql.catalyst.util.ArrayData;
@@ -384,34 +383,6 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
     };
   }
 
-  private static class HoodieBloomFilterRowWriteSupport extends 
HoodieBloomFilterWriteSupport<UTF8String> {
-
-    private static final HoodieUTF8StringFactory UTF8STRING_FACTORY =
-        SparkAdapterSupport$.MODULE$.sparkAdapter().getUTF8StringFactory();
-
-    public HoodieBloomFilterRowWriteSupport(BloomFilter bloomFilter) {
-      super(bloomFilter);
-    }
-
-    @Override
-    protected int compareRecordKey(UTF8String a, UTF8String b) {
-      return 
UTF8STRING_FACTORY.wrapUTF8String(a).compareTo(UTF8STRING_FACTORY.wrapUTF8String(b));
-    }
-
-    @Override
-    protected byte[] getUTF8Bytes(UTF8String key) {
-      return key.getBytes();
-    }
-
-    @Override
-    protected UTF8String dereference(UTF8String key) {
-      // NOTE: [[clone]] is performed here (rather than [[copy]]) to only copy 
underlying buffer in
-      //       cases when [[UTF8String]] is pointing into a buffer storing the 
whole containing record,
-      //       and simply do a pass over when it holds a (immutable) buffer 
holding just the string
-      return key.clone();
-    }
-  }
-
   public static HoodieRowParquetWriteSupport 
getHoodieRowParquetWriteSupport(Configuration conf, StructType structType,
                                                                              
Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {
     return (HoodieRowParquetWriteSupport) ReflectionUtils.loadClass(
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
index c8f9f950ccb4..74dfcab40eba 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.io.lance;
 
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.memory.HoodieArrowAllocator;
 import org.apache.hudi.storage.StoragePath;
@@ -34,6 +36,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Base class for Hudi Lance file writers supporting different record types.
@@ -43,13 +46,14 @@ import java.io.IOException;
  * - BufferAllocator management
  * - Record buffering and batch flushing
  * - File size checks
+ * - Bloom filter metadata writing
  *
  * Subclasses must implement type-specific conversion to Arrow format.
  *
  * @param <R> The record type (e.g., GenericRecord, InternalRow)
  */
 @NotThreadSafe
-public abstract class HoodieBaseLanceWriter<R> implements Closeable {
+public abstract class HoodieBaseLanceWriter<R, K extends Comparable<K>> 
implements Closeable {
   /** Memory size for data write operations: 120MB */
   private static final long LANCE_DATA_ALLOCATOR_SIZE = 120 * 1024 * 1024L;
 
@@ -62,6 +66,7 @@ public abstract class HoodieBaseLanceWriter<R> implements 
Closeable {
   private int currentBatchSize = 0;
   private VectorSchemaRoot root;
   private ArrowWriter<R> arrowWriter;
+  protected final Option<HoodieBloomFilterWriteSupport<K>> 
bloomFilterWriteSupportOpt;
 
   private LanceFileWriter writer;
 
@@ -70,12 +75,15 @@ public abstract class HoodieBaseLanceWriter<R> implements 
Closeable {
    *
    * @param path Path where Lance file will be written
    * @param batchSize Number of records to buffer before flushing to Lance
+   * @param bloomFilterWriteSupportOpt Optional bloom filter write support for 
record key tracking
    */
-  protected HoodieBaseLanceWriter(StoragePath path, int batchSize) {
+  protected HoodieBaseLanceWriter(StoragePath path, int batchSize,
+                                  Option<HoodieBloomFilterWriteSupport<K>> 
bloomFilterWriteSupportOpt) {
     this.path = path;
     this.allocator = HoodieArrowAllocator.newChildAllocator(
         getClass().getSimpleName() + "-data-" + path.getName(), 
LANCE_DATA_ALLOCATOR_SIZE);
     this.batchSize = batchSize;
+    this.bloomFilterWriteSupportOpt = bloomFilterWriteSupportOpt;
   }
 
   /**
@@ -152,6 +160,14 @@ public abstract class HoodieBaseLanceWriter<R> implements 
Closeable {
         root.setRowCount(0);
         writer.write(root);
       }
+
+      // Finalize and write bloom filter metadata
+      if (writer != null && bloomFilterWriteSupportOpt.isPresent()) {
+        Map<String, String> metadata = 
bloomFilterWriteSupportOpt.get().finalizeMetadata();
+        if (!metadata.isEmpty()) {
+          writer.addSchemaMetadata(metadata);
+        }
+      }
     } catch (Exception e) {
       primaryException = e;
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
index ae827f73d7f0..7d43430753f6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
@@ -20,10 +20,15 @@ package org.apache.hudi.io.storage;
 
 import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.bloom.SimpleBloomFilter;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.storage.HoodieStorage;
@@ -57,10 +62,13 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.common.bloom.BloomFilterTypeCode.DYNAMIC_V0;
+import static org.apache.hudi.common.bloom.BloomFilterTypeCode.SIMPLE;
 import static org.apache.hudi.io.storage.LanceTestUtils.createRow;
 import static 
org.apache.hudi.io.storage.LanceTestUtils.createRowWithMetaFields;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -75,12 +83,14 @@ public class TestHoodieSparkLanceReader {
   private HoodieStorage storage;
   private SparkTaskContextSupplier taskContextSupplier;
   private String instantTime;
+  private BloomFilter simpleBloomFilter;
 
   @BeforeEach
   public void setUp() throws IOException {
     storage = HoodieTestUtils.getStorage(tempDir.getAbsolutePath());
     taskContextSupplier = new SparkTaskContextSupplier();
     instantTime = "20251201120000000";
+    simpleBloomFilter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 
10000, SIMPLE.name());
   }
 
   @AfterEach
@@ -294,8 +304,9 @@ public class TestHoodieSparkLanceReader {
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_large.lance");
     int recordCount = 2500;
+    BloomFilter dynamicBloomFilter = 
BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, DYNAMIC_V0.name());
     try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false)) {
+        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(dynamicBloomFilter))) {
       for (int i = 0; i < recordCount; i++) {
         GenericInternalRow row = new GenericInternalRow(new Object[]{i, (long) 
i * 2});
         writer.writeRow("key" + i, row);
@@ -320,6 +331,8 @@ public class TestHoodieSparkLanceReader {
       }
 
       assertEquals(recordCount, count, "Should read all records");
+
+      assertBloomFilter(reader, HoodieDynamicBoundedBloomFilter.class, "key0", 
"key999", 2500);
     }
   }
 
@@ -533,6 +546,8 @@ public class TestHoodieSparkLanceReader {
         seqNos.add(row.getUTF8String(1).toString());
       }
       assertEquals(3, seqNos.size(), "All sequence numbers should be unique");
+
+      assertBloomFilter(reader, SimpleBloomFilter.class, "key0", "key2", 3);
     }
   }
 
@@ -563,7 +578,7 @@ public class TestHoodieSparkLanceReader {
     
   private HoodieSparkLanceReader writeAndCreateReader(StoragePath path, 
StructType schema, List<InternalRow> rows, boolean populateMetaFields) throws 
IOException {
     try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, 
populateMetaFields)) {
+        path, schema, instantTime, taskContextSupplier, storage, 
populateMetaFields, Option.of(simpleBloomFilter))) {
       for (int i = 0; i < rows.size(); i++) {
         HoodieKey key = new HoodieKey("key" + i, "default_partition");
         // Note writeRowWithMetadata implicitly handles case where 
populateMetaFields=false
@@ -588,8 +603,9 @@ public class TestHoodieSparkLanceReader {
 
     // Write Lance file with full schema
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_projection.lance");
+    BloomFilter dynamicBloom = BloomFilterFactory.createBloomFilter(1000, 
0.0001, 10000, DYNAMIC_V0.name());
     try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, fullSchema, instantTime, taskContextSupplier, storage, false)) {
+        path, fullSchema, instantTime, taskContextSupplier, storage, false, 
Option.of(dynamicBloom))) {
       for (int i = 0; i < rows.size(); i++) {
         writer.writeRow("key" + i, rows.get(i));
       }
@@ -623,7 +639,21 @@ public class TestHoodieSparkLanceReader {
           count++;
         }
         assertEquals(3, count, "Should read all 3 records");
+
+        assertBloomFilter(reader, HoodieDynamicBoundedBloomFilter.class, 
"key0", "key2", 3);
       }
     }
   }
+
+  private void assertBloomFilter(HoodieSparkLanceReader reader, Class<?> 
clazz, String minKey, String maxKey, int keyCount) {
+    BloomFilter bloomFilter = reader.readBloomFilter();
+    assertInstanceOf(clazz, bloomFilter);
+    for (int i = 0; i < keyCount; i++) {
+      assertTrue(bloomFilter.mightContain("key" + i));
+    }
+    assertFalse(bloomFilter.mightContain("no_such_key"));
+    String[] minMaxKeys = reader.readMinMaxRecordKeys();
+    assertEquals(minKey, minMaxKeys[0]);
+    assertEquals(maxKey, minMaxKeys[1]);
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
index 7add395f913f..c1d907fce01b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
@@ -19,8 +19,11 @@
 package org.apache.hudi.io.storage;
 
 import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.io.memory.HoodieArrowAllocator;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
@@ -53,6 +56,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.hudi.common.bloom.BloomFilterTypeCode.SIMPLE;
 import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD;
@@ -77,12 +81,14 @@ public class TestHoodieSparkLanceWriter {
   private HoodieStorage storage;
   private SparkTaskContextSupplier taskContextSupplier;
   private String instantTime;
+  private BloomFilter simpleBloomFilter;
 
   @BeforeEach
   public void setUp() throws IOException {
     storage = HoodieTestUtils.getStorage(tempDir.getAbsolutePath());
     taskContextSupplier = new SparkTaskContextSupplier();
     instantTime = "20251201120000000";
+    simpleBloomFilter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 
10000, SIMPLE.name());
   }
 
   @AfterEach
@@ -99,7 +105,7 @@ public class TestHoodieSparkLanceWriter {
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_with_metadata.lance");
     try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, true)) {
+        path, schema, instantTime, taskContextSupplier, storage, true, 
Option.of(simpleBloomFilter))) {
       // Write multiple records to test metadata population and sequence ID 
generation
       for (int i = 0; i < 3; i++) {
         InternalRow row = createRowWithMetaFields(i, "User" + i, 20L + i);
@@ -168,7 +174,7 @@ public class TestHoodieSparkLanceWriter {
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_without_metadata.lance");
     try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false)) {
+        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
       // Create row with just user data (no meta fields)
       InternalRow row = createRow(1, "Bob", 25L);
       HoodieKey key = new HoodieKey("key2", "partition2");
@@ -212,7 +218,7 @@ public class TestHoodieSparkLanceWriter {
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_simple_write.lance");
     try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false)) {
+        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
       InternalRow row = createRow(1, "Charlie", 35L);
       writer.writeRow("key3", row);
     }
@@ -233,7 +239,7 @@ public class TestHoodieSparkLanceWriter {
     // Write more than DEFAULT_BATCH_SIZE (1000) records
     int recordCount = 2500;
     try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false)) {
+        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
       for (int i = 0; i < recordCount; i++) {
         InternalRow row = createRow(i, "User" + i, 20L + i);
         writer.writeRow("key" + i, row);
@@ -261,7 +267,7 @@ public class TestHoodieSparkLanceWriter {
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_primitives.lance");
     try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false)) {
+        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
       GenericInternalRow row = new GenericInternalRow(new Object[]{
           42,                                    // int
           123456789L,                           // long
@@ -308,7 +314,7 @@ public class TestHoodieSparkLanceWriter {
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_nulls.lance");
     try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false)) {
+        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
       // Write rows with null values
       writer.writeRow("key1", createRow(1, "Alice", 30L));
       writer.writeRow("key2", createRow(2, null, 25L));  // null name
@@ -345,7 +351,7 @@ public class TestHoodieSparkLanceWriter {
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_empty.lance");
     try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false)) {
+        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
       // Close without writing any rows
     }
 
@@ -393,7 +399,7 @@ public class TestHoodieSparkLanceWriter {
 
     StoragePath path = new StoragePath(tempDir.getAbsolutePath() + 
"/test_struct.lance");
     try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
-        path, schema, instantTime, taskContextSupplier, storage, false)) {
+        path, schema, instantTime, taskContextSupplier, storage, false, 
Option.of(simpleBloomFilter))) {
       for (int i = 0; i < rows.size(); i++) {
         writer.writeRow("key" + i, rows.get(i));
       }


Reply via email to