This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e5f121cdd61 [HUDI-9215] Set partitionColumnsWithKeyGenerator based on
table version (#13025)
e5f121cdd61 is described below
commit e5f121cdd6148ad365efa9f0d1703f0d821b9fd8
Author: Vinish Reddy <[email protected]>
AuthorDate: Tue Mar 25 23:28:52 2025 +0530
[HUDI-9215] Set partitionColumnsWithKeyGenerator based on table version
(#13025)
---
.../scala/org/apache/hudi/util/SparkKeyGenUtils.scala | 6 ++++--
.../scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 10 ++++++----
.../java/org/apache/hudi/cli/BootstrapExecutorUtils.java | 4 +++-
.../hudi/utilities/streamer/BootstrapExecutor.java | 3 ++-
.../org/apache/hudi/utilities/streamer/StreamSync.java | 3 ++-
.../utilities/deltastreamer/TestHoodieDeltaStreamer.java | 16 ++++++++++++++++
6 files changed, 33 insertions(+), 9 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
index 9df55526e2f..a495bbb4707 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
@@ -18,6 +18,7 @@
package org.apache.hudi.util
import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.table.HoodieTableVersion
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -37,11 +38,12 @@ object SparkKeyGenUtils {
/**
* @param properties config properties
+ * @param writerTableVersion table version used by writer
* @return partition columns
*/
- def getPartitionColumnsForKeyGenerator(props: TypedProperties): String = {
+ def getPartitionColumnsForKeyGenerator(props: TypedProperties,
writerTableVersion: HoodieTableVersion): String = {
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
- getPartitionColumns(keyGenerator, props, true)
+ getPartitionColumns(keyGenerator, props, writerTableVersion.versionCode()
> HoodieTableVersion.SIX.versionCode())
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 58cfebf596d..424dd9b4c20 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -277,6 +277,7 @@ class HoodieSparkSqlWriterInternal {
}
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(hoodieConfig.getProps))
+ val tableVersion = Integer.valueOf(getStringWithAltKeys(parameters,
HoodieWriteConfig.WRITE_TABLE_VERSION))
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not
performing actual writes.")
(false, common.util.Option.empty(), common.util.Option.empty(),
common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
@@ -284,7 +285,7 @@ class HoodieSparkSqlWriterInternal {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig,
tblName, operation, fs)
val partitionColumns =
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters),
false)
- val partitionColumnsForKeyGenerator =
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters),
true)
+ val partitionColumnsForKeyGenerator =
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(toProperties(parameters),
HoodieTableVersion.fromVersionCode(tableVersion))
val timelineTimeZone =
HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
val tableMetaClient = if (tableExists) {
HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
@@ -305,7 +306,7 @@ class HoodieSparkSqlWriterInternal {
else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig)
HoodieTableMetaClient.newTableBuilder()
.setTableType(tableType)
- .setTableVersion(Integer.valueOf(getStringWithAltKeys(parameters,
HoodieWriteConfig.WRITE_TABLE_VERSION)))
+ .setTableVersion(tableVersion)
.setDatabaseName(databaseName)
.setTableName(tblName)
.setBaseFileFormat(baseFileFormat)
@@ -729,7 +730,8 @@ class HoodieSparkSqlWriterInternal {
if (!tableExists) {
val archiveLogFolder =
hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_HISTORY_PATH)
- val partitionColumnsWithType =
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(toProperties(parameters))
+ val tableVersion = Integer.valueOf(getStringWithAltKeys(parameters,
HoodieWriteConfig.WRITE_TABLE_VERSION))
+ val partitionColumnsWithType =
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(toProperties(parameters),
HoodieTableVersion.fromVersionCode(tableVersion))
val recordKeyFields =
hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val payloadClass =
hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME)
val recordMergerStrategy =
hoodieConfig.getString(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID)
@@ -752,7 +754,7 @@ class HoodieSparkSqlWriterInternal {
.setTableType(HoodieTableType.valueOf(tableType))
.setTableName(tableName)
.setRecordKeyFields(recordKeyFields)
- .setTableVersion(Integer.valueOf(getStringWithAltKeys(parameters,
HoodieWriteConfig.WRITE_TABLE_VERSION)))
+ .setTableVersion(tableVersion)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(payloadClass)
.setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE)))
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
index 40cb963ec72..fa30988eb45 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -281,7 +282,8 @@ public class BootstrapExecutorUtils implements Serializable
{
keyGenClass = KeyGeneratorType.getKeyGeneratorClassName(new
HoodieConfig(props));
}
props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), keyGenClass);
- String partitionColumnsForKeyGenerator =
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props);
+ HoodieTableVersion tableVersion =
HoodieTableVersion.fromVersionCode(props.getInteger(HoodieWriteConfig.WRITE_TABLE_VERSION.key(),
HoodieTableVersion.current().versionCode()));
+ String partitionColumnsForKeyGenerator =
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props, tableVersion);
if (StringUtils.isNullOrEmpty(partitionColumnsForKeyGenerator)) {
partitionColumnsForKeyGenerator = null;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
index 453c17ec4f4..547d49122ef 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -233,7 +234,7 @@ public class BootstrapExecutor implements Serializable {
.setPartitionMetafileUseBaseFormat(props.getBoolean(
PARTITION_METAFILE_USE_BASE_FORMAT.key(),
PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()));
- String partitionColumnsForKeyGenerator =
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props);
+ String partitionColumnsForKeyGenerator =
SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props,
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props,
WRITE_TABLE_VERSION)));
if (!StringUtils.isNullOrEmpty(partitionColumnsForKeyGenerator)) {
builder.setPartitionFields(partitionColumnsForKeyGenerator).setKeyGeneratorClassProp(
props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(),
SimpleKeyGenerator.class.getName()));
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index bf3377a20ea..a67149c7190 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -45,6 +45,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -419,7 +420,7 @@ public class StreamSync implements Serializable, Closeable {
private HoodieTableMetaClient initializeEmptyTable() throws IOException {
return initializeEmptyTable(HoodieTableMetaClient.newTableBuilder(),
- SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props),
+ SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(props,
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props,
WRITE_TABLE_VERSION))),
HadoopFSUtils.getStorageConfWithCopy(hoodieSparkContext.hadoopConfiguration()));
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 675c03894c0..86c7502348e 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -79,6 +79,7 @@ import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
import org.apache.hudi.keygen.ComplexKeyGenerator;
+import org.apache.hudi.keygen.CustomKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.metrics.Metrics;
@@ -474,6 +475,21 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertEquals(configFlag,
Boolean.parseBoolean(metaClient.getTableConfig().getUrlEncodePartitioning()));
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT"})
+ public void testPartitionKeyFieldsBasedOnVersion(HoodieTableVersion version)
throws IOException {
+ String tablePath = basePath + "/partition_key_fields_meta_client" +
version.versionCode();
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tablePath,
WriteOperationType.INSERT);
+ cfg.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" +
version.versionCode());
+ cfg.configs.add(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() + "=" +
CustomKeyGenerator.class.getName());
+
cfg.configs.add("hoodie.datasource.write.partitionpath.field=partition_path:simple");
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
+ deltaStreamer.getIngestionService().ingestOnce();
+ HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(context, tablePath);
+ String expectedPartitionFields = version.equals(HoodieTableVersion.SIX) ?
"partition_path" : "partition_path:simple";
+ assertEquals(expectedPartitionFields,
metaClient.getTableConfig().getString(HoodieTableConfig.PARTITION_FIELDS));
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
public void testBulkInsertsAndUpsertsWithBootstrap(HoodieRecordType
recordType) throws Exception {