This is an automated email from the ASF dual-hosted git repository.
wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cad08b1f2cfd feat(lance): Support bloom filter in Lance writer and
reader (#18304)
cad08b1f2cfd is described below
commit cad08b1f2cfd637db050d37dee8fd9efddd5fc6a
Author: Vova Kolmakov <[email protected]>
AuthorDate: Thu Mar 19 09:00:51 2026 +0700
feat(lance): Support bloom filter in Lance writer and reader (#18304)
---
.../io/storage/HoodieSparkFileWriterFactory.java | 4 +-
.../hudi/io/storage/HoodieSparkLanceReader.java | 68 ++++++++++++++++++----
.../hudi/io/storage/HoodieSparkLanceWriter.java | 31 +++++-----
.../row/HoodieBloomFilterRowWriteSupport.java | 58 ++++++++++++++++++
.../storage/row/HoodieRowParquetWriteSupport.java | 29 ---------
.../hudi/io/lance/HoodieBaseLanceWriter.java | 20 ++++++-
.../io/storage/TestHoodieSparkLanceReader.java | 36 +++++++++++-
.../io/storage/TestHoodieSparkLanceWriter.java | 22 ++++---
8 files changed, 201 insertions(+), 67 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index bd4ba955d04a..9ebb94a0bb54 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -109,8 +109,10 @@ public class HoodieSparkFileWriterFactory extends
HoodieFileWriterFactory {
TaskContextSupplier
taskContextSupplier) throws IOException {
boolean populateMetaFields =
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
+ boolean enableBloomFilter = enableBloomFilter(populateMetaFields, config);
+ Option<BloomFilter> bloomFilter = enableBloomFilter ?
Option.of(createBloomFilter(config)) : Option.empty();
- return new HoodieSparkLanceWriter(path, structType, instantTime,
taskContextSupplier, storage, populateMetaFields);
+ return new HoodieSparkLanceWriter(path, structType, instantTime,
taskContextSupplier, storage, populateMetaFields, bloomFilter);
}
private static HoodieRowParquetWriteSupport
getHoodieRowParquetWriteSupport(StorageConfiguration<?> conf, HoodieSchema
schema,
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
index f99bf2ce9eac..3bf6625a4fd5 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
@@ -20,6 +20,8 @@ package org.apache.hudi.io.storage;
import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.bloom.SimpleBloomFilter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.schema.HoodieSchema;
@@ -32,6 +34,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.memory.HoodieArrowAllocator;
import org.apache.hudi.storage.StoragePath;
+import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
@@ -46,13 +49,19 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import static
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
/**
* {@link HoodieSparkFileReader} implementation for Lance file format.
*/
+@Slf4j
public class HoodieSparkLanceReader implements HoodieSparkFileReader {
// Memory size for data read operations: 120MB
public static final long LANCE_DATA_ALLOCATOR_SIZE = 120 * 1024 * 1024;
@@ -63,19 +72,52 @@ public class HoodieSparkLanceReader implements
HoodieSparkFileReader {
// number of rows to read
private static final int DEFAULT_BATCH_SIZE = 512;
private final StoragePath path;
+ private final BufferAllocator metadataAllocator;
+ private final LanceFileReader lanceMetadataReader;
+ private final Schema arrowSchema;
public HoodieSparkLanceReader(StoragePath path) {
this.path = path;
+ metadataAllocator = HoodieArrowAllocator.newChildAllocator(
+ getClass().getSimpleName() + "-metadata-" + path.getName(),
LANCE_METADATA_ALLOCATOR_SIZE);
+ try {
+ lanceMetadataReader = LanceFileReader.open(path.toString(),
metadataAllocator);
+ arrowSchema = lanceMetadataReader.schema();
+ } catch (Exception e) {
+ close();
+ throw new HoodieException("Failed to create lanceMetadataReader: " +
path, e);
+ }
}
@Override
public String[] readMinMaxRecordKeys() {
- throw new UnsupportedOperationException("Min/max record key tracking is
not yet supported for Lance file format");
+ Map<String, String> metadata = arrowSchema.getCustomMetadata();
+ if (metadata != null && !metadata.isEmpty()) {
+ String minKey = metadata.get(HOODIE_MIN_RECORD_KEY_FOOTER);
+ String maxKey = metadata.get(HOODIE_MAX_RECORD_KEY_FOOTER);
+ if (minKey != null && maxKey != null) {
+ return new String[] { minKey, maxKey };
+ }
+ }
+ throw new HoodieException("Could not read min/max record key out of Lance
file: " + path);
}
@Override
public BloomFilter readBloomFilter() {
- throw new UnsupportedOperationException("Bloom filter is not yet supported
for Lance file format");
+ BloomFilter toReturn = null;
+ Map<String, String> metadata = arrowSchema.getCustomMetadata();
+ if (metadata != null && !metadata.isEmpty()) {
+ if (metadata.containsKey(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)) {
+ String bloomSer = metadata.get(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
+ String filterType = metadata.get(HOODIE_BLOOM_FILTER_TYPE_CODE);
+ if (filterType != null &&
filterType.contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
+ toReturn = new HoodieDynamicBoundedBloomFilter(bloomSer);
+ } else {
+ toReturn = new SimpleBloomFilter(bloomSer);
+ }
+ }
+ }
+ return toReturn;
}
@Override
@@ -159,10 +201,7 @@ public class HoodieSparkLanceReader implements
HoodieSparkFileReader {
@Override
public HoodieSchema getSchema() {
- try (BufferAllocator allocator = HoodieArrowAllocator.newChildAllocator(
- getClass().getSimpleName() + "-metadata-" + path.getName(),
LANCE_METADATA_ALLOCATOR_SIZE);
- LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
- Schema arrowSchema = reader.schema();
+ try {
StructType structType = LanceArrowUtils.fromArrowSchema(arrowSchema);
return
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType,
"record", "", false);
} catch (Exception e) {
@@ -172,15 +211,22 @@ public class HoodieSparkLanceReader implements
HoodieSparkFileReader {
@Override
public void close() {
- // noop as resources are managed by the LanceRecordIterator and not within
this reader class.
+ if (lanceMetadataReader != null) {
+ try {
+ lanceMetadataReader.close();
+ } catch (Exception e) {
+ log.warn("Error while closing metadataLanceReader: {}",
e.getMessage());
+ }
+ }
+ if (metadataAllocator != null) {
+ metadataAllocator.close();
+ }
}
@Override
public long getTotalRecords() {
- try (BufferAllocator allocator = HoodieArrowAllocator.newChildAllocator(
- getClass().getSimpleName() + "-metadata-" + path.getName(),
LANCE_METADATA_ALLOCATOR_SIZE);
- LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
- return reader.numRows();
+ try {
+ return lanceMetadataReader.numRows();
} catch (Exception e) {
throw new HoodieException("Failed to get row count from Lance file: " +
path, e);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
index 08aae1979d8e..e029fcbab010 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
@@ -18,10 +18,13 @@
package org.apache.hudi.io.storage;
+import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
+import org.apache.hudi.io.storage.row.HoodieBloomFilterRowWriteSupport;
import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
@@ -49,10 +52,11 @@ import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECO
*
* This writer integrates with Hudi's storage I/O layer and supports:
* - Hudi metadata field population
- * - Record key tracking (for bloom filters - TODO
https://github.com/apache/hudi/issues/17664)
+ * - Record key tracking (for bloom filters)
* - Sequence ID generation
+ * - Min/max record key tracking
*/
-public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow>
+public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow,
UTF8String>
implements HoodieSparkFileWriter, HoodieInternalRowFileWriter {
private static final String DEFAULT_TIMEZONE = "UTC";
@@ -73,15 +77,16 @@ public class HoodieSparkLanceWriter extends
HoodieBaseLanceWriter<InternalRow>
* @param taskContextSupplier Task context supplier for partition ID
* @param storage HoodieStorage instance
* @param populateMetaFields Whether to populate Hudi metadata fields
- * @throws IOException if writer initialization fails
+ * @param bloomFilterOpt Optional bloom filter for record key tracking
*/
public HoodieSparkLanceWriter(StoragePath file,
StructType sparkSchema,
String instantTime,
TaskContextSupplier taskContextSupplier,
HoodieStorage storage,
- boolean populateMetaFields) throws IOException
{
- super(file, DEFAULT_BATCH_SIZE);
+ boolean populateMetaFields,
+ Option<BloomFilter> bloomFilterOpt) {
+ super(file, DEFAULT_BATCH_SIZE,
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new));
this.sparkSchema = sparkSchema;
this.arrowSchema = LanceArrowUtils.toArrowSchema(sparkSchema,
DEFAULT_TIMEZONE, true, false);
this.fileName = UTF8String.fromString(file.getName());
@@ -100,34 +105,34 @@ public class HoodieSparkLanceWriter extends
HoodieBaseLanceWriter<InternalRow>
* @param sparkSchema Spark schema for the data
* @param taskContextSupplier Task context supplier for partition ID
* @param storage HoodieStorage instance
- * @throws IOException if writer initialization fails
*/
public HoodieSparkLanceWriter(StoragePath file,
StructType sparkSchema,
TaskContextSupplier taskContextSupplier,
- HoodieStorage storage) throws IOException {
- this(file, sparkSchema, null, taskContextSupplier, storage, false);
+ HoodieStorage storage) {
+ this(file, sparkSchema, null, taskContextSupplier, storage, false,
Option.empty());
}
@Override
public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws
IOException {
+ UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
+ bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
bloomFilterWriteSupport.addKey(recordKey));
if (populateMetaFields) {
- UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
updateRecordMetadata(row, recordKey, key.getPartitionPath(),
getWrittenRecordCount());
- super.write(row);
- } else {
- super.write(row);
}
+ super.write(row);
}
@Override
public void writeRow(String recordKey, InternalRow row) throws IOException {
+ bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
+ bloomFilterWriteSupport.addKey(UTF8String.fromString(recordKey)));
super.write(row);
}
@Override
public void writeRow(UTF8String key, InternalRow row) throws IOException {
- // Key reserved for future bloom filter support
(https://github.com/apache/hudi/issues/17664)
+ bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
bloomFilterWriteSupport.addKey(key));
super.write(row);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterRowWriteSupport.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterRowWriteSupport.java
new file mode 100644
index 000000000000..4a4519a90041
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterRowWriteSupport.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.SparkAdapterSupport$;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
+import org.apache.hudi.common.bloom.BloomFilter;
+
+import org.apache.spark.sql.HoodieUTF8StringFactory;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Bloom filter write support implementation for Lance file format.
+ * Handles UTF8String record keys specific to Spark InternalRow.
+ */
+public class HoodieBloomFilterRowWriteSupport extends
HoodieBloomFilterWriteSupport<UTF8String> {
+
+ private static final HoodieUTF8StringFactory UTF8STRING_FACTORY =
+ SparkAdapterSupport$.MODULE$.sparkAdapter().getUTF8StringFactory();
+
+ public HoodieBloomFilterRowWriteSupport(BloomFilter bloomFilter) {
+ super(bloomFilter);
+ }
+
+ @Override
+ protected int compareRecordKey(UTF8String a, UTF8String b) {
+ return
UTF8STRING_FACTORY.wrapUTF8String(a).compareTo(UTF8STRING_FACTORY.wrapUTF8String(b));
+ }
+
+ @Override
+ protected byte[] getUTF8Bytes(UTF8String key) {
+ return key.getBytes();
+ }
+
+ @Override
+ protected UTF8String dereference(UTF8String key) {
+ // NOTE: [[clone]] is performed here (rather than [[copy]]) to only copy
underlying buffer in
+ // cases when [[UTF8String]] is pointing into a buffer storing the
whole containing record,
+ // and simply do a pass over when it holds a (immutable) buffer
holding just the string
+ return key.clone();
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index 9d0da7f534af..be2ed64ece72 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -45,7 +45,6 @@ import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
-import org.apache.spark.sql.HoodieUTF8StringFactory;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
import org.apache.spark.sql.catalyst.util.ArrayData;
@@ -384,34 +383,6 @@ public class HoodieRowParquetWriteSupport extends
WriteSupport<InternalRow> {
};
}
- private static class HoodieBloomFilterRowWriteSupport extends
HoodieBloomFilterWriteSupport<UTF8String> {
-
- private static final HoodieUTF8StringFactory UTF8STRING_FACTORY =
- SparkAdapterSupport$.MODULE$.sparkAdapter().getUTF8StringFactory();
-
- public HoodieBloomFilterRowWriteSupport(BloomFilter bloomFilter) {
- super(bloomFilter);
- }
-
- @Override
- protected int compareRecordKey(UTF8String a, UTF8String b) {
- return
UTF8STRING_FACTORY.wrapUTF8String(a).compareTo(UTF8STRING_FACTORY.wrapUTF8String(b));
- }
-
- @Override
- protected byte[] getUTF8Bytes(UTF8String key) {
- return key.getBytes();
- }
-
- @Override
- protected UTF8String dereference(UTF8String key) {
- // NOTE: [[clone]] is performed here (rather than [[copy]]) to only copy
underlying buffer in
- // cases when [[UTF8String]] is pointing into a buffer storing the
whole containing record,
- // and simply do a pass over when it holds a (immutable) buffer
holding just the string
- return key.clone();
- }
- }
-
public static HoodieRowParquetWriteSupport
getHoodieRowParquetWriteSupport(Configuration conf, StructType structType,
Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {
return (HoodieRowParquetWriteSupport) ReflectionUtils.loadClass(
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
index c8f9f950ccb4..74dfcab40eba 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
@@ -19,6 +19,8 @@
package org.apache.hudi.io.lance;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.memory.HoodieArrowAllocator;
import org.apache.hudi.storage.StoragePath;
@@ -34,6 +36,7 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Map;
/**
* Base class for Hudi Lance file writers supporting different record types.
@@ -43,13 +46,14 @@ import java.io.IOException;
* - BufferAllocator management
* - Record buffering and batch flushing
* - File size checks
+ * - Bloom filter metadata writing
*
* Subclasses must implement type-specific conversion to Arrow format.
*
* @param <R> The record type (e.g., GenericRecord, InternalRow)
*/
@NotThreadSafe
-public abstract class HoodieBaseLanceWriter<R> implements Closeable {
+public abstract class HoodieBaseLanceWriter<R, K extends Comparable<K>>
implements Closeable {
/** Memory size for data write operations: 120MB */
private static final long LANCE_DATA_ALLOCATOR_SIZE = 120 * 1024 * 1024L;
@@ -62,6 +66,7 @@ public abstract class HoodieBaseLanceWriter<R> implements
Closeable {
private int currentBatchSize = 0;
private VectorSchemaRoot root;
private ArrowWriter<R> arrowWriter;
+ protected final Option<HoodieBloomFilterWriteSupport<K>>
bloomFilterWriteSupportOpt;
private LanceFileWriter writer;
@@ -70,12 +75,15 @@ public abstract class HoodieBaseLanceWriter<R> implements
Closeable {
*
* @param path Path where Lance file will be written
* @param batchSize Number of records to buffer before flushing to Lance
+ * @param bloomFilterWriteSupportOpt Optional bloom filter write support for
record key tracking
*/
- protected HoodieBaseLanceWriter(StoragePath path, int batchSize) {
+ protected HoodieBaseLanceWriter(StoragePath path, int batchSize,
+ Option<HoodieBloomFilterWriteSupport<K>>
bloomFilterWriteSupportOpt) {
this.path = path;
this.allocator = HoodieArrowAllocator.newChildAllocator(
getClass().getSimpleName() + "-data-" + path.getName(),
LANCE_DATA_ALLOCATOR_SIZE);
this.batchSize = batchSize;
+ this.bloomFilterWriteSupportOpt = bloomFilterWriteSupportOpt;
}
/**
@@ -152,6 +160,14 @@ public abstract class HoodieBaseLanceWriter<R> implements
Closeable {
root.setRowCount(0);
writer.write(root);
}
+
+ // Finalize and write bloom filter metadata
+ if (writer != null && bloomFilterWriteSupportOpt.isPresent()) {
+ Map<String, String> metadata =
bloomFilterWriteSupportOpt.get().finalizeMetadata();
+ if (!metadata.isEmpty()) {
+ writer.addSchemaMetadata(metadata);
+ }
+ }
} catch (Exception e) {
primaryException = e;
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
index ae827f73d7f0..7d43430753f6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceReader.java
@@ -20,10 +20,15 @@ package org.apache.hudi.io.storage;
import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.bloom.SimpleBloomFilter;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.storage.HoodieStorage;
@@ -57,10 +62,13 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
+import static org.apache.hudi.common.bloom.BloomFilterTypeCode.DYNAMIC_V0;
+import static org.apache.hudi.common.bloom.BloomFilterTypeCode.SIMPLE;
import static org.apache.hudi.io.storage.LanceTestUtils.createRow;
import static
org.apache.hudi.io.storage.LanceTestUtils.createRowWithMetaFields;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -75,12 +83,14 @@ public class TestHoodieSparkLanceReader {
private HoodieStorage storage;
private SparkTaskContextSupplier taskContextSupplier;
private String instantTime;
+ private BloomFilter simpleBloomFilter;
@BeforeEach
public void setUp() throws IOException {
storage = HoodieTestUtils.getStorage(tempDir.getAbsolutePath());
taskContextSupplier = new SparkTaskContextSupplier();
instantTime = "20251201120000000";
+ simpleBloomFilter = BloomFilterFactory.createBloomFilter(1000, 0.0001,
10000, SIMPLE.name());
}
@AfterEach
@@ -294,8 +304,9 @@ public class TestHoodieSparkLanceReader {
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_large.lance");
int recordCount = 2500;
+ BloomFilter dynamicBloomFilter =
BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, DYNAMIC_V0.name());
try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false)) {
+ path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(dynamicBloomFilter))) {
for (int i = 0; i < recordCount; i++) {
GenericInternalRow row = new GenericInternalRow(new Object[]{i, (long)
i * 2});
writer.writeRow("key" + i, row);
@@ -320,6 +331,8 @@ public class TestHoodieSparkLanceReader {
}
assertEquals(recordCount, count, "Should read all records");
+
+ assertBloomFilter(reader, HoodieDynamicBoundedBloomFilter.class, "key0",
"key999", 2500);
}
}
@@ -533,6 +546,8 @@ public class TestHoodieSparkLanceReader {
seqNos.add(row.getUTF8String(1).toString());
}
assertEquals(3, seqNos.size(), "All sequence numbers should be unique");
+
+ assertBloomFilter(reader, SimpleBloomFilter.class, "key0", "key2", 3);
}
}
@@ -563,7 +578,7 @@ public class TestHoodieSparkLanceReader {
private HoodieSparkLanceReader writeAndCreateReader(StoragePath path,
StructType schema, List<InternalRow> rows, boolean populateMetaFields) throws
IOException {
try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage,
populateMetaFields)) {
+ path, schema, instantTime, taskContextSupplier, storage,
populateMetaFields, Option.of(simpleBloomFilter))) {
for (int i = 0; i < rows.size(); i++) {
HoodieKey key = new HoodieKey("key" + i, "default_partition");
// Note writeRowWithMetadata implicitly handles case where
populateMetaFields=false
@@ -588,8 +603,9 @@ public class TestHoodieSparkLanceReader {
// Write Lance file with full schema
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_projection.lance");
+ BloomFilter dynamicBloom = BloomFilterFactory.createBloomFilter(1000,
0.0001, 10000, DYNAMIC_V0.name());
try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, fullSchema, instantTime, taskContextSupplier, storage, false)) {
+ path, fullSchema, instantTime, taskContextSupplier, storage, false,
Option.of(dynamicBloom))) {
for (int i = 0; i < rows.size(); i++) {
writer.writeRow("key" + i, rows.get(i));
}
@@ -623,7 +639,21 @@ public class TestHoodieSparkLanceReader {
count++;
}
assertEquals(3, count, "Should read all 3 records");
+
+ assertBloomFilter(reader, HoodieDynamicBoundedBloomFilter.class,
"key0", "key2", 3);
}
}
}
+
+ private void assertBloomFilter(HoodieSparkLanceReader reader, Class<?>
clazz, String minKey, String maxKey, int keyCount) {
+ BloomFilter bloomFilter = reader.readBloomFilter();
+ assertInstanceOf(clazz, bloomFilter);
+ for (int i = 0; i < keyCount; i++) {
+ assertTrue(bloomFilter.mightContain("key" + i));
+ }
+ assertFalse(bloomFilter.mightContain("no_such_key"));
+ String[] minMaxKeys = reader.readMinMaxRecordKeys();
+ assertEquals(minKey, minMaxKeys[0]);
+ assertEquals(maxKey, minMaxKeys[1]);
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
index 7add395f913f..c1d907fce01b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
@@ -19,8 +19,11 @@
package org.apache.hudi.io.storage;
import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.io.memory.HoodieArrowAllocator;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
@@ -53,6 +56,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.hudi.common.bloom.BloomFilterTypeCode.SIMPLE;
import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD;
import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD;
@@ -77,12 +81,14 @@ public class TestHoodieSparkLanceWriter {
private HoodieStorage storage;
private SparkTaskContextSupplier taskContextSupplier;
private String instantTime;
+ private BloomFilter simpleBloomFilter;
@BeforeEach
public void setUp() throws IOException {
storage = HoodieTestUtils.getStorage(tempDir.getAbsolutePath());
taskContextSupplier = new SparkTaskContextSupplier();
instantTime = "20251201120000000";
+ simpleBloomFilter = BloomFilterFactory.createBloomFilter(1000, 0.0001,
10000, SIMPLE.name());
}
@AfterEach
@@ -99,7 +105,7 @@ public class TestHoodieSparkLanceWriter {
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_with_metadata.lance");
try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, true)) {
+ path, schema, instantTime, taskContextSupplier, storage, true,
Option.of(simpleBloomFilter))) {
// Write multiple records to test metadata population and sequence ID
generation
for (int i = 0; i < 3; i++) {
InternalRow row = createRowWithMetaFields(i, "User" + i, 20L + i);
@@ -168,7 +174,7 @@ public class TestHoodieSparkLanceWriter {
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_without_metadata.lance");
try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false)) {
+ path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
// Create row with just user data (no meta fields)
InternalRow row = createRow(1, "Bob", 25L);
HoodieKey key = new HoodieKey("key2", "partition2");
@@ -212,7 +218,7 @@ public class TestHoodieSparkLanceWriter {
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_simple_write.lance");
try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false)) {
+ path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
InternalRow row = createRow(1, "Charlie", 35L);
writer.writeRow("key3", row);
}
@@ -233,7 +239,7 @@ public class TestHoodieSparkLanceWriter {
// Write more than DEFAULT_BATCH_SIZE (1000) records
int recordCount = 2500;
try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false)) {
+ path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
for (int i = 0; i < recordCount; i++) {
InternalRow row = createRow(i, "User" + i, 20L + i);
writer.writeRow("key" + i, row);
@@ -261,7 +267,7 @@ public class TestHoodieSparkLanceWriter {
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_primitives.lance");
try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false)) {
+ path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
GenericInternalRow row = new GenericInternalRow(new Object[]{
42, // int
123456789L, // long
@@ -308,7 +314,7 @@ public class TestHoodieSparkLanceWriter {
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_nulls.lance");
try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false)) {
+ path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
// Write rows with null values
writer.writeRow("key1", createRow(1, "Alice", 30L));
writer.writeRow("key2", createRow(2, null, 25L)); // null name
@@ -345,7 +351,7 @@ public class TestHoodieSparkLanceWriter {
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_empty.lance");
try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false)) {
+ path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
// Close without writing any rows
}
@@ -393,7 +399,7 @@ public class TestHoodieSparkLanceWriter {
StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_struct.lance");
try (HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
- path, schema, instantTime, taskContextSupplier, storage, false)) {
+ path, schema, instantTime, taskContextSupplier, storage, false,
Option.of(simpleBloomFilter))) {
for (int i = 0; i < rows.size(); i++) {
writer.writeRow("key" + i, rows.get(i));
}