yihua commented on code in PR #13687:
URL: https://github.com/apache/hudi/pull/13687#discussion_r2267538988


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1773,4 +1778,204 @@ class TestMORDataSource extends 
HoodieSparkClientTestBase with SparkDatasetMixin
       .save(basePath)
     assertEquals(10, 
spark.read.format("org.apache.hudi").load(basePath).filter("rider = 
'rider-001'").count())
   }
+
+  @ParameterizedTest
+  @CsvSource(Array(
+    "6,8,true,UPGRADE",           // Normal upgrade: table=6, write=8, 
autoUpgrade=true → should upgrade
+    "6,8,false,VERSION_ADJUST",   // Auto-upgrade disabled: table=6, write=8, 
autoUpgrade=false → adjust version
+    "4,8,true,EXCEPTION",         // Auto-upgrade enabled: Should throw 
exception since table version is less than 6
+    "4,8,false,EXCEPTION"         // Auto-upgrade disabled: Should throw 
exception since table version is less than 6
+  ))
+  def testBaseHoodieWriteClientUpgradeDecisionLogic(
+    tableVersionStr: String,
+    writeVersionStr: String,
+    autoUpgrade: Boolean,
+    expectedResult: String): Unit = {
+    val tableVersion = 
HoodieTableVersion.fromVersionCode(tableVersionStr.toInt)
+    val writeVersion = 
HoodieTableVersion.fromVersionCode(writeVersionStr.toInt)
+    // Create a temporary directory for this test
+    val testBasePath = 
s"${basePath}_upgrade_test_${tableVersionStr}_${writeVersionStr}_${autoUpgrade}_${System.currentTimeMillis()}"
+
+    try {
+      loadFixtureTable(testBasePath, tableVersion)
+
+      val testWriteOpts = getFixtureCompatibleWriteOpts(tableVersion) ++ Map(
+        HoodieWriteConfig.WRITE_TABLE_VERSION.key -> 
writeVersion.versionCode().toString,
+        HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> autoUpgrade.toString,
+        DataSourceWriteOptions.TABLE_TYPE.key -> 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+        DataSourceWriteOptions.OPERATION.key -> 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL
+      )
+
+      val testData = createFixtureCompatibleTestData()
+      val schema = StructType(Array(
+        StructField("id", StringType, false),
+        StructField("name", StringType, false),
+        StructField("ts", LongType, false),
+        StructField("partition", StringType, false)
+      ))
+      val inputDF = 
spark.createDataFrame(spark.sparkContext.parallelize(testData), schema)
+
+      // Execute the test based on expected result
+      expectedResult match {
+        case "UPGRADE" =>
+          // Should perform upgrade successfully
+          inputDF.write.format("hudi")
+            .options(testWriteOpts)
+            .mode(SaveMode.Append)
+            .save(testBasePath)
+
+          // Verify table was upgraded
+          val upgradeMetaClient = HoodieTableMetaClient.builder()
+            .setConf(storageConf.newInstance())
+            .setBasePath(testBasePath)
+            .build()
+          
assertTrue(upgradeMetaClient.getTableConfig.getTableVersion.versionCode() >= 
writeVersion.versionCode(),
+            s"Table should have been upgraded to at least version 
${writeVersion.versionCode()}")
+
+          // Verify data integrity
+          val resultDF = spark.read.format("hudi").load(testBasePath)
+          assertEquals(11, resultDF.count(), "Data should be preserved after 
upgrade")

Review Comment:
   Let's do full data validation similar to `TestUpgradeDowngrade`, beyond 
count?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java:
##########
@@ -85,19 +85,44 @@ public UpgradeDowngrade(
 
   public static boolean needsUpgradeOrDowngrade(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config, HoodieTableVersion toWriteVersion) {
     HoodieTableVersion fromTableVersion = 
metaClient.getTableConfig().getTableVersion();
-    return needsUpgrade(metaClient, config, toWriteVersion) || 
toWriteVersion.versionCode() < fromTableVersion.versionCode();
+    return needsUpgrade(metaClient, config, toWriteVersion) || 
needsDowngrade(fromTableVersion, toWriteVersion);
+  }
+
+  public static boolean needsDowngrade(HoodieTableVersion fromTableVersion, 
HoodieTableVersion toWriteVersion) {
+    if (toWriteVersion.lesserThan(HoodieTableVersion.SIX)) {
+      // for 1.1 we will do not allow downgrades to below SIX
+      // user will have to downgrade the table using a prior hudi version.
+      throw new HoodieUpgradeDowngradeException(
+              String.format("1.1.0 only supports table version greater then 
version SIX or above."
+                      + " Please downgrade table from version %s to %s using a 
hudi version prior to 1.1.0", fromTableVersion, toWriteVersion));

Review Comment:
   fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to