This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 30b4e8f3f31 [HUDI-9369] Simplify bloom filter config passing in
metadata table writer (#13253)
30b4e8f3f31 is described below
commit 30b4e8f3f3102ef31dc7120af8d7538f2bdb3b21
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri May 2 22:27:39 2025 -0700
[HUDI-9369] Simplify bloom filter config passing in metadata table writer
(#13253)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 2 +-
.../apache/hudi/config/TestHoodieWriteConfig.java | 16 ++++++++
.../client/utils/SparkMetadataWriterUtils.java | 30 ++++++++-------
.../SparkHoodieBackedTableMetadataWriter.java | 6 +--
.../hudi/common/config/HoodieStorageConfig.java | 4 ++
.../common/config/TestHoodieStorageConfig.java | 43 ++++++++++++++++++++++
.../hudi/feature/index/TestExpressionIndex.scala | 2 +-
7 files changed, 84 insertions(+), 19 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 770f0535d00..812d083a70e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2055,7 +2055,7 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public String getBloomFilterType() {
- return getString(HoodieStorageConfig.BLOOM_FILTER_TYPE);
+ return getStorageConfig().getBloomFilterType();
}
public int getDynamicBloomFilterMaxNumEntries() {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 716f62d2894..90709ce8c7d 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -22,7 +22,9 @@ import
org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.client.transaction.lock.NoopLockProvider;
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
@@ -55,6 +57,7 @@ import java.util.Set;
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -709,6 +712,19 @@ public class TestHoodieWriteConfig {
writeConfig.getViewStorageConfig().getMaxMemoryForFileGroupMap());
}
+ @Test
+ void testBloomFilterType() {
+ String bloomFilterType = BloomFilterTypeCode.SIMPLE.name();
+
assertNotEquals(HoodieStorageConfig.BLOOM_FILTER_TYPE.defaultValue().toUpperCase(),
+ bloomFilterType.toUpperCase());
+ Properties props = new Properties();
+ props.put(HoodieStorageConfig.BLOOM_FILTER_TYPE.key(), bloomFilterType);
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withProperties(props).build();
+ assertEquals(bloomFilterType, config.getBloomFilterType());
+ }
+
private HoodieWriteConfig createWriteConfig(Map<String, String> configs) {
final Properties properties = new Properties();
configs.forEach(properties::setProperty);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index d8ad5943138..28ae8622cdf 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineType;
@@ -206,10 +207,11 @@ public class SparkMetadataWriterUtils {
: new ExpressionIndexComputationMetadata(colStatRecords);
}
- public static ExpressionIndexComputationMetadata
getExpressionIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String
columnToIndex, HoodieWriteConfig metadataWriteConfig, String instantTime,
-
HoodieIndexDefinition indexDefinition) {
+ public static ExpressionIndexComputationMetadata
getExpressionIndexRecordsUsingBloomFilter(
+ Dataset<Row> dataset, String columnToIndex, HoodieStorageConfig
storageConfig, String instantTime,
+ HoodieIndexDefinition indexDefinition) {
String indexName = indexDefinition.getIndexName();
- setBloomFilterProps(metadataWriteConfig,
indexDefinition.getIndexOptions());
+ setBloomFilterProps(storageConfig, indexDefinition.getIndexOptions());
// Group data using expression index metadata and then create bloom filter
on the group
Dataset<HoodieRecord> bloomFilterRecords = dataset.select(columnToIndex,
SparkMetadataWriterUtils.getExpressionIndexColumnNames())
@@ -219,22 +221,22 @@ public class SparkMetadataWriterUtils {
String partition = pair.getLeft().toString();
String relativeFilePath = pair.getRight().toString();
String fileName = FSUtils.getFileName(relativeFilePath, partition);
- BloomFilter bloomFilter =
HoodieFileWriterFactory.createBloomFilter(metadataWriteConfig);
+ BloomFilter bloomFilter =
HoodieFileWriterFactory.createBloomFilter(storageConfig);
iterator.forEachRemaining(row -> {
byte[] key = row.getAs(columnToIndex).toString().getBytes();
bloomFilter.add(key);
});
ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString()));
- HoodieRecord bloomFilterRecord =
createBloomFilterMetadataRecord(partition, fileName, instantTime,
metadataWriteConfig.getBloomFilterType(), bloomByteBuffer, false, indexName);
+ HoodieRecord bloomFilterRecord =
createBloomFilterMetadataRecord(partition, fileName, instantTime,
storageConfig.getBloomFilterType(), bloomByteBuffer, false, indexName);
return Collections.singletonList(bloomFilterRecord).iterator();
}), Encoders.kryo(HoodieRecord.class));
return new
ExpressionIndexComputationMetadata(HoodieJavaRDD.of(bloomFilterRecords.javaRDD()));
}
- private static void setBloomFilterProps(HoodieWriteConfig
metadataWriteConfig, Map<String, String> indexOptions) {
+ private static void setBloomFilterProps(HoodieStorageConfig storageConfig,
Map<String, String> indexOptions) {
BLOOM_FILTER_CONFIG_MAPPING.forEach((sourceKey, targetKey) -> {
if (indexOptions.containsKey(sourceKey)) {
- metadataWriteConfig.getProps().setProperty(targetKey,
indexOptions.get(sourceKey));
+ storageConfig.getProps().setProperty(targetKey,
indexOptions.get(sourceKey));
}
});
}
@@ -305,16 +307,15 @@ public class SparkMetadataWriterUtils {
* @param instantTime Instant time
* @param engineContext HoodieEngineContext
* @param dataWriteConfig Write Config for the data table
- * @param metadataWriteConfig Write config for the metadata table
* @param partitionRecordsFunctionOpt Function used to generate
partition stat records for the EI. It takes the column range metadata generated
for the provided partition files as input
* and uses those to generate the
final partition stats
* @return ExpressionIndexComputationMetadata containing both EI column stat
records and partition stat records if partitionRecordsFunctionOpt is provided
*/
- @SuppressWarnings("checkstyle:LineLength")
- public static ExpressionIndexComputationMetadata
getExprIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
-
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String
instantTime,
-
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
HoodieWriteConfig metadataWriteConfig,
-
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>,
HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
+ public static ExpressionIndexComputationMetadata getExprIndexRecords(
+ List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet,
HoodieIndexDefinition indexDefinition,
+ HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema,
String instantTime,
+ HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
+ Option<Function<HoodiePairData<String,
HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>>
partitionRecordsFunctionOpt) {
HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
if (indexDefinition.getSourceFields().isEmpty()) {
// In case there are no columns to index, bail
@@ -348,7 +349,8 @@ public class SparkMetadataWriterUtils {
if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
} else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
- return getExpressionIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime, indexDefinition);
+ return getExpressionIndexRecordsUsingBloomFilter(
+ rowDataset, columnToIndex, dataWriteConfig.getStorageConfig(),
instantTime, indexDefinition);
} else {
throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 863a4995f6e..875ebd858ad 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -18,7 +18,6 @@
package org.apache.hudi.metadata;
-import org.apache.hudi.index.HoodieSparkIndexClient;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -41,6 +40,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.index.HoodieSparkIndexClient;
import org.apache.hudi.index.expression.HoodieSparkExpressionIndex;
import
org.apache.hudi.index.expression.HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata;
import org.apache.hudi.metrics.DistributedRegistry;
@@ -198,7 +198,7 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
// with the expression index records from the unmodified files to get the
new partition stat records
HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata
expressionIndexComputationMetadata =
SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathPairs,
indexDefinition, dataMetaClient, parallelism, readerSchema, instantTime,
engineContext, dataWriteConfig,
- metadataWriteConfig, partitionRecordsFunctionOpt);
+ partitionRecordsFunctionOpt);
return
expressionIndexComputationMetadata.getPartitionStatRecordsOption().isPresent()
?
expressionIndexComputationMetadata.getExpressionIndexRecords().union(expressionIndexComputationMetadata.getPartitionStatRecordsOption().get())
: expressionIndexComputationMetadata.getExpressionIndexRecords();
@@ -211,7 +211,7 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
Schema
readerSchema, StorageConfiguration<?> storageConf,
String
instantTime) {
ExpressionIndexComputationMetadata expressionIndexComputationMetadata =
SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathAndSizeTriplet,
indexDefinition,
- metaClient, parallelism, readerSchema, instantTime, engineContext,
dataWriteConfig, metadataWriteConfig,
+ metaClient, parallelism, readerSchema, instantTime, engineContext,
dataWriteConfig,
Option.of(rangeMetadata ->
HoodieTableMetadataUtil.collectAndProcessExprIndexPartitionStatRecords(rangeMetadata,
true, Option.of(indexDefinition.getIndexName()))));
HoodieData<HoodieRecord> exprIndexRecords =
expressionIndexComputationMetadata.getExpressionIndexRecords();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index 88b30860e33..ffe96b1ac66 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -385,6 +385,10 @@ public class HoodieStorageConfig extends HoodieConfig {
super();
}
+ public String getBloomFilterType() {
+ return getString(BLOOM_FILTER_TYPE);
+ }
+
public static HoodieStorageConfig.Builder newBuilder() {
return new Builder();
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
new file mode 100644
index 00000000000..2ec7cd356e1
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class TestHoodieStorageConfig {
+ @Test
+ void testHoodieStorageConfig() {
+ String bloomFilterType = BloomFilterTypeCode.SIMPLE.name();
+
assertNotEquals(HoodieStorageConfig.BLOOM_FILTER_TYPE.defaultValue().toUpperCase(),
+ bloomFilterType.toUpperCase());
+ Properties props = new Properties();
+ props.put(HoodieStorageConfig.BLOOM_FILTER_TYPE.key(), bloomFilterType);
+ HoodieStorageConfig config = HoodieStorageConfig.newBuilder()
+ .fromProperties(props).build();
+ assertEquals(bloomFilterType, config.getBloomFilterType());
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
index 63ce1cb1c3e..5ea40b85592 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
@@ -2190,7 +2190,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
HoodieExpressionIndex.DYNAMIC_BLOOM_MAX_ENTRIES -> "1000"
)
val bloomFilterRecords =
SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter(df, "c5",
- HoodieWriteConfig.newBuilder().withPath("a/b").build(), "",
+ HoodieStorageConfig.newBuilder().build(), "",
HoodieIndexDefinition.newBuilder().withIndexName("random").withIndexOptions(JavaConverters.mapAsJavaMapConverter(indexOptions).asJava).build())
.getExpressionIndexRecords
// Since there is only one partition file pair there is only one bloom
filter record