yihua commented on code in PR #13687:
URL: https://github.com/apache/hudi/pull/13687#discussion_r2267566229
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1773,4 +1778,204 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
.save(basePath)
assertEquals(10,
spark.read.format("org.apache.hudi").load(basePath).filter("rider =
'rider-001'").count())
}
+
+ @ParameterizedTest
+ @CsvSource(Array(
+ "6,8,true,UPGRADE", // Normal upgrade: table=6, write=8,
autoUpgrade=true → should upgrade
+ "6,8,false,VERSION_ADJUST", // Auto-upgrade disabled: table=6, write=8,
autoUpgrade=false → adjust version
+ "4,8,true,EXCEPTION", // Auto-upgrade enabled: Should throw
exception since table version is less than 6
+ "4,8,false,EXCEPTION" // Auto-upgrade disabled: Should throw
exception since table version is less than 6
+ ))
+ def testBaseHoodieWriteClientUpgradeDecisionLogic(
+ tableVersionStr: String,
+ writeVersionStr: String,
+ autoUpgrade: Boolean,
+ expectedResult: String): Unit = {
+ val tableVersion =
HoodieTableVersion.fromVersionCode(tableVersionStr.toInt)
+ val writeVersion =
HoodieTableVersion.fromVersionCode(writeVersionStr.toInt)
+ // Create a temporary directory for this test
+ val testBasePath =
s"${basePath}_upgrade_test_${tableVersionStr}_${writeVersionStr}_${autoUpgrade}_${System.currentTimeMillis()}"
+
+ try {
+ loadFixtureTable(testBasePath, tableVersion)
+
+ val testWriteOpts = getFixtureCompatibleWriteOpts(tableVersion) ++ Map(
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key ->
writeVersion.versionCode().toString,
+ HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> autoUpgrade.toString,
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL
+ )
+
+ val testData = createFixtureCompatibleTestData()
+ val schema = StructType(Array(
+ StructField("id", StringType, false),
+ StructField("name", StringType, false),
+ StructField("ts", LongType, false),
+ StructField("partition", StringType, false)
+ ))
+ val inputDF =
spark.createDataFrame(spark.sparkContext.parallelize(testData), schema)
+
+ // Execute the test based on expected result
+ expectedResult match {
+ case "UPGRADE" =>
+ // Should perform upgrade successfully
+ inputDF.write.format("hudi")
+ .options(testWriteOpts)
+ .mode(SaveMode.Append)
+ .save(testBasePath)
+
+ // Verify table was upgraded
+ val upgradeMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(testBasePath)
+ .build()
+
assertTrue(upgradeMetaClient.getTableConfig.getTableVersion.versionCode() >=
writeVersion.versionCode(),
+ s"Table should have been upgraded to at least version
${writeVersion.versionCode()}")
+
+ // Verify data integrity
+ val resultDF = spark.read.format("hudi").load(testBasePath)
+ assertEquals(11, resultDF.count(), "Data should be preserved after
upgrade")
+
+ case "NO_UPGRADE" =>
+ // Should complete successfully without upgrade
+ inputDF.write.format("hudi")
+ .options(testWriteOpts)
+ .mode(SaveMode.Append)
+ .save(testBasePath)
+
+ // Verify table version remained unchanged
+ val noUpgradeMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(testBasePath)
+ .build()
+ assertEquals(tableVersion,
noUpgradeMetaClient.getTableConfig.getTableVersion,
+ s"Table version should remain at $tableVersion")
+
+ // Verify data integrity
+ val resultDF = spark.read.format("hudi").load(testBasePath)
+ assertEquals(11, resultDF.count(), "Data should be written
successfully without upgrade")
+
+ case "VERSION_ADJUST" =>
Review Comment:
This seems to be redundant as the logic is the same as `NO_UPGRADE`
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1773,4 +1778,204 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
.save(basePath)
assertEquals(10,
spark.read.format("org.apache.hudi").load(basePath).filter("rider =
'rider-001'").count())
}
+
+ @ParameterizedTest
+ @CsvSource(Array(
+ "6,8,true,UPGRADE", // Normal upgrade: table=6, write=8,
autoUpgrade=true → should upgrade
+ "6,8,false,VERSION_ADJUST", // Auto-upgrade disabled: table=6, write=8,
autoUpgrade=false → adjust version
+ "4,8,true,EXCEPTION", // Auto-upgrade enabled: Should throw
exception since table version is less than 6
+ "4,8,false,EXCEPTION" // Auto-upgrade disabled: Should throw
exception since table version is less than 6
+ ))
+ def testBaseHoodieWriteClientUpgradeDecisionLogic(
+ tableVersionStr: String,
+ writeVersionStr: String,
+ autoUpgrade: Boolean,
+ expectedResult: String): Unit = {
+ val tableVersion =
HoodieTableVersion.fromVersionCode(tableVersionStr.toInt)
+ val writeVersion =
HoodieTableVersion.fromVersionCode(writeVersionStr.toInt)
+ // Create a temporary directory for this test
+ val testBasePath =
s"${basePath}_upgrade_test_${tableVersionStr}_${writeVersionStr}_${autoUpgrade}_${System.currentTimeMillis()}"
+
+ try {
+ loadFixtureTable(testBasePath, tableVersion)
+
+ val testWriteOpts = getFixtureCompatibleWriteOpts(tableVersion) ++ Map(
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key ->
writeVersion.versionCode().toString,
+ HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> autoUpgrade.toString,
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL
+ )
+
+ val testData = createFixtureCompatibleTestData()
+ val schema = StructType(Array(
+ StructField("id", StringType, false),
+ StructField("name", StringType, false),
+ StructField("ts", LongType, false),
+ StructField("partition", StringType, false)
+ ))
+ val inputDF =
spark.createDataFrame(spark.sparkContext.parallelize(testData), schema)
+
+ // Execute the test based on expected result
+ expectedResult match {
+ case "UPGRADE" =>
+ // Should perform upgrade successfully
+ inputDF.write.format("hudi")
+ .options(testWriteOpts)
+ .mode(SaveMode.Append)
+ .save(testBasePath)
+
+ // Verify table was upgraded
+ val upgradeMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(testBasePath)
+ .build()
+
assertTrue(upgradeMetaClient.getTableConfig.getTableVersion.versionCode() >=
writeVersion.versionCode(),
+ s"Table should have been upgraded to at least version
${writeVersion.versionCode()}")
+
+ // Verify data integrity
+ val resultDF = spark.read.format("hudi").load(testBasePath)
+ assertEquals(11, resultDF.count(), "Data should be preserved after
upgrade")
+
+ case "NO_UPGRADE" =>
+ // Should complete successfully without upgrade
+ inputDF.write.format("hudi")
+ .options(testWriteOpts)
+ .mode(SaveMode.Append)
+ .save(testBasePath)
+
+ // Verify table version remained unchanged
+ val noUpgradeMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(testBasePath)
+ .build()
+ assertEquals(tableVersion,
noUpgradeMetaClient.getTableConfig.getTableVersion,
+ s"Table version should remain at $tableVersion")
+
+ // Verify data integrity
+ val resultDF = spark.read.format("hudi").load(testBasePath)
+ assertEquals(11, resultDF.count(), "Data should be written
successfully without upgrade")
+
+ case "VERSION_ADJUST" =>
+ // Should adjust write version and complete successfully
+ inputDF.write.format("hudi")
+ .options(testWriteOpts)
+ .mode(SaveMode.Append)
+ .save(testBasePath)
+
+ // Verify table version remained unchanged
+ val adjustMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(testBasePath)
+ .build()
+ assertEquals(tableVersion,
adjustMetaClient.getTableConfig.getTableVersion,
+ s"Table version should remain at $tableVersion when autoUpgrade is
disabled")
+
+ // Verify data integrity
+ val resultDF = spark.read.format("hudi").load(testBasePath)
+ assertEquals(11, resultDF.count(), "Data should be written
successfully after version adjustment")
+
+ case "EXCEPTION" =>
+ // Should throw HoodieUpgradeDowngradeException
+ val exception =
assertThrows(classOf[HoodieUpgradeDowngradeException]) {
+ inputDF.write.format("hudi")
+ .options(testWriteOpts)
+ .mode(SaveMode.Append)
+ .save(testBasePath)
+ }
+
+ // Verify exception message contains expected content
+ val expectedMessageFragment = if (tableVersion.versionCode() <
HoodieTableVersion.SIX.versionCode()) {
+ // For Hudi 1.1.0: any table version < 6 throws exception with
this message
+ "Hudi 1.x release only supports table version greater than version
6 or above"
+ } else {
+ "upgrade"
+ }
+ assertTrue(exception.getMessage.contains(expectedMessageFragment),
+ s"Exception message should contain '${expectedMessageFragment}',
but was: ${exception.getMessage}")
+ }
+
+ } finally {
+ // Cleanup test directory
+ try {
+ val testPath = new StoragePath(testBasePath)
+ if (storage.exists(testPath)) {
+ storage.deleteDirectory(testPath)
+ }
+ } catch {
+ case _: Exception => // Ignore cleanup errors
+ }
+ }
+ }
+
+ private def loadFixtureTable(testBasePath: String, version:
HoodieTableVersion): HoodieTableMetaClient = {
+ val fixtureName = getFixtureName(version)
+ val resourcePath = s"/upgrade-downgrade-fixtures/mor-tables/$fixtureName"
+
+ // Create temporary directory for fixture extraction
+ val tempFixtureDir = Files.createTempDirectory("hudi-fixture-")
+
+ try {
+ // Extract fixture to temporary directory
+ HoodieTestUtils.extractZipToDirectory(resourcePath, tempFixtureDir,
this.getClass)
+
+ // Copy extracted table to test location
+ val tableName = fixtureName.replace(".zip", "")
+ val extractedTablePath = tempFixtureDir.resolve(tableName).toFile
+ val testTablePath = new File(testBasePath)
+
+ // Copy the extracted table contents to our test path
+ FileUtils.copyDirectory(extractedTablePath, testTablePath)
+
+ // Verify the table was loaded at the expected version
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(testBasePath)
+ .build()
+ assertEquals(version, metaClient.getTableConfig.getTableVersion,
+ s"Fixture table should be at version ${version}")
+ metaClient
+ } finally {
+ // Cleanup temporary fixture directory
+ try {
+ FileUtils.deleteDirectory(tempFixtureDir.toFile)
+ } catch {
+ case _: Exception => // Ignore cleanup errors
+ }
+ }
+ }
+
+ private def getFixtureName(version: HoodieTableVersion): String = {
+ version match {
+ case HoodieTableVersion.FOUR => "hudi-v4-table.zip"
+ case HoodieTableVersion.FIVE => "hudi-v5-table.zip"
+ case HoodieTableVersion.SIX => "hudi-v6-table.zip"
+ case HoodieTableVersion.EIGHT => "hudi-v8-table.zip"
+ case HoodieTableVersion.NINE => "hudi-v9-table.zip"
+ case _ => throw new IllegalArgumentException(s"Unsupported fixture
version: $version")
+ }
+ }
+
+ private def getFixtureCompatibleWriteOpts(version: HoodieTableVersion):
Map[String, String] = {
Review Comment:
Redundant argument as `fixtureName` and `tableName` derived from `version`
are not used
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1773,4 +1778,204 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
.save(basePath)
assertEquals(10,
spark.read.format("org.apache.hudi").load(basePath).filter("rider =
'rider-001'").count())
}
+
+ @ParameterizedTest
+ @CsvSource(Array(
+ "6,8,true,UPGRADE", // Normal upgrade: table=6, write=8,
autoUpgrade=true → should upgrade
+ "6,8,false,VERSION_ADJUST", // Auto-upgrade disabled: table=6, write=8,
autoUpgrade=false → adjust version
+ "4,8,true,EXCEPTION", // Auto-upgrade enabled: Should throw
exception since table version is less than 6
+ "4,8,false,EXCEPTION" // Auto-upgrade disabled: Should throw
exception since table version is less than 6
+ ))
+ def testBaseHoodieWriteClientUpgradeDecisionLogic(
+ tableVersionStr: String,
+ writeVersionStr: String,
+ autoUpgrade: Boolean,
+ expectedResult: String): Unit = {
+ val tableVersion =
HoodieTableVersion.fromVersionCode(tableVersionStr.toInt)
+ val writeVersion =
HoodieTableVersion.fromVersionCode(writeVersionStr.toInt)
+ // Create a temporary directory for this test
+ val testBasePath =
s"${basePath}_upgrade_test_${tableVersionStr}_${writeVersionStr}_${autoUpgrade}_${System.currentTimeMillis()}"
+
+ try {
+ loadFixtureTable(testBasePath, tableVersion)
+
+ val testWriteOpts = getFixtureCompatibleWriteOpts(tableVersion) ++ Map(
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key ->
writeVersion.versionCode().toString,
+ HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> autoUpgrade.toString,
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL
+ )
+
+ val testData = createFixtureCompatibleTestData()
+ val schema = StructType(Array(
+ StructField("id", StringType, false),
+ StructField("name", StringType, false),
+ StructField("ts", LongType, false),
+ StructField("partition", StringType, false)
+ ))
+ val inputDF =
spark.createDataFrame(spark.sparkContext.parallelize(testData), schema)
+
+ // Execute the test based on expected result
+ expectedResult match {
+ case "UPGRADE" =>
+ // Should perform upgrade successfully
+ inputDF.write.format("hudi")
+ .options(testWriteOpts)
+ .mode(SaveMode.Append)
+ .save(testBasePath)
+
+ // Verify table was upgraded
+ val upgradeMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(testBasePath)
+ .build()
+
assertTrue(upgradeMetaClient.getTableConfig.getTableVersion.versionCode() >=
writeVersion.versionCode(),
Review Comment:
This should `assertEquals`
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1773,4 +1778,204 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
.save(basePath)
assertEquals(10,
spark.read.format("org.apache.hudi").load(basePath).filter("rider =
'rider-001'").count())
}
+
+ @ParameterizedTest
+ @CsvSource(Array(
+ "6,8,true,UPGRADE", // Normal upgrade: table=6, write=8,
autoUpgrade=true → should upgrade
+ "6,8,false,VERSION_ADJUST", // Auto-upgrade disabled: table=6, write=8,
autoUpgrade=false → adjust version
+ "4,8,true,EXCEPTION", // Auto-upgrade enabled: Should throw
exception since table version is less than 6
+ "4,8,false,EXCEPTION" // Auto-upgrade disabled: Should throw
exception since table version is less than 6
+ ))
+ def testBaseHoodieWriteClientUpgradeDecisionLogic(
+ tableVersionStr: String,
+ writeVersionStr: String,
+ autoUpgrade: Boolean,
+ expectedResult: String): Unit = {
+ val tableVersion =
HoodieTableVersion.fromVersionCode(tableVersionStr.toInt)
+ val writeVersion =
HoodieTableVersion.fromVersionCode(writeVersionStr.toInt)
+ // Create a temporary directory for this test
+ val testBasePath =
s"${basePath}_upgrade_test_${tableVersionStr}_${writeVersionStr}_${autoUpgrade}_${System.currentTimeMillis()}"
+
+ try {
+ loadFixtureTable(testBasePath, tableVersion)
+
+ val testWriteOpts = getFixtureCompatibleWriteOpts(tableVersion) ++ Map(
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key ->
writeVersion.versionCode().toString,
+ HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> autoUpgrade.toString,
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL
+ )
+
+ val testData = createFixtureCompatibleTestData()
+ val schema = StructType(Array(
+ StructField("id", StringType, false),
+ StructField("name", StringType, false),
+ StructField("ts", LongType, false),
+ StructField("partition", StringType, false)
+ ))
+ val inputDF =
spark.createDataFrame(spark.sparkContext.parallelize(testData), schema)
+
+ // Execute the test based on expected result
+ expectedResult match {
+ case "UPGRADE" =>
+ // Should perform upgrade successfully
+ inputDF.write.format("hudi")
+ .options(testWriteOpts)
+ .mode(SaveMode.Append)
+ .save(testBasePath)
+
+ // Verify table was upgraded
+ val upgradeMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(testBasePath)
+ .build()
+
assertTrue(upgradeMetaClient.getTableConfig.getTableVersion.versionCode() >=
writeVersion.versionCode(),
+ s"Table should have been upgraded to at least version
${writeVersion.versionCode()}")
+
+ // Verify data integrity
+ val resultDF = spark.read.format("hudi").load(testBasePath)
+ assertEquals(11, resultDF.count(), "Data should be preserved after
upgrade")
+
+ case "NO_UPGRADE" =>
+ // Should complete successfully without upgrade
+ inputDF.write.format("hudi")
+ .options(testWriteOpts)
+ .mode(SaveMode.Append)
+ .save(testBasePath)
+
+ // Verify table version remained unchanged
+ val noUpgradeMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(testBasePath)
+ .build()
+ assertEquals(tableVersion,
noUpgradeMetaClient.getTableConfig.getTableVersion,
+ s"Table version should remain at $tableVersion")
+
+ // Verify data integrity
+ val resultDF = spark.read.format("hudi").load(testBasePath)
+ assertEquals(11, resultDF.count(), "Data should be written
successfully without upgrade")
+
+ case "VERSION_ADJUST" =>
+ // Should adjust write version and complete successfully
+ inputDF.write.format("hudi")
+ .options(testWriteOpts)
+ .mode(SaveMode.Append)
+ .save(testBasePath)
+
+ // Verify table version remained unchanged
+ val adjustMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(testBasePath)
+ .build()
+ assertEquals(tableVersion,
adjustMetaClient.getTableConfig.getTableVersion,
+ s"Table version should remain at $tableVersion when autoUpgrade is
disabled")
+
+ // Verify data integrity
+ val resultDF = spark.read.format("hudi").load(testBasePath)
+ assertEquals(11, resultDF.count(), "Data should be written
successfully after version adjustment")
+
+ case "EXCEPTION" =>
+ // Should throw HoodieUpgradeDowngradeException
+ val exception =
assertThrows(classOf[HoodieUpgradeDowngradeException]) {
+ inputDF.write.format("hudi")
+ .options(testWriteOpts)
+ .mode(SaveMode.Append)
+ .save(testBasePath)
+ }
+
+ // Verify exception message contains expected content
+ val expectedMessageFragment = if (tableVersion.versionCode() <
HoodieTableVersion.SIX.versionCode()) {
+ // For Hudi 1.1.0: any table version < 6 throws exception with
this message
+ "Hudi 1.x release only supports table version greater than version
6 or above"
+ } else {
+ "upgrade"
+ }
+ assertTrue(exception.getMessage.contains(expectedMessageFragment),
+ s"Exception message should contain '${expectedMessageFragment}',
but was: ${exception.getMessage}")
+ }
+
+ } finally {
+ // Cleanup test directory
+ try {
+ val testPath = new StoragePath(testBasePath)
+ if (storage.exists(testPath)) {
+ storage.deleteDirectory(testPath)
+ }
+ } catch {
+ case _: Exception => // Ignore cleanup errors
+ }
+ }
+ }
+
+ private def loadFixtureTable(testBasePath: String, version:
HoodieTableVersion): HoodieTableMetaClient = {
+ val fixtureName = getFixtureName(version)
+ val resourcePath = s"/upgrade-downgrade-fixtures/mor-tables/$fixtureName"
+
+ // Create temporary directory for fixture extraction
+ val tempFixtureDir = Files.createTempDirectory("hudi-fixture-")
+
+ try {
+ // Extract fixture to temporary directory
+ HoodieTestUtils.extractZipToDirectory(resourcePath, tempFixtureDir,
this.getClass)
+
+ // Copy extracted table to test location
+ val tableName = fixtureName.replace(".zip", "")
+ val extractedTablePath = tempFixtureDir.resolve(tableName).toFile
+ val testTablePath = new File(testBasePath)
+
+ // Copy the extracted table contents to our test path
+ FileUtils.copyDirectory(extractedTablePath, testTablePath)
+
+ // Verify the table was loaded at the expected version
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(testBasePath)
+ .build()
+ assertEquals(version, metaClient.getTableConfig.getTableVersion,
+ s"Fixture table should be at version ${version}")
+ metaClient
+ } finally {
+ // Cleanup temporary fixture directory
+ try {
+ FileUtils.deleteDirectory(tempFixtureDir.toFile)
+ } catch {
+ case _: Exception => // Ignore cleanup errors
+ }
+ }
+ }
+
+ private def getFixtureName(version: HoodieTableVersion): String = {
Review Comment:
Let's reuse the same method in `TestUpgradeDowngrade`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]