This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch HUDI-8990-Part3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 44911bca9b6433fa573adeb441a1a117858d671f
Author: YueZhang <[email protected]>
AuthorDate: Tue Apr 8 15:51:47 2025 +0800

    finish dry-run && rollback-command && show-config-command && 
add-expression-command
---
 .../model/PartitionBucketIndexHashingConfig.java   |  48 +++--
 .../procedures/PartitionBucketIndexManager.scala   | 204 ++++++++++++---------
 .../TestInsertTableWithPartitionBucketIndex.scala  | 154 ++++++++++++++--
 3 files changed, 289 insertions(+), 117 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
index 4467607ef8b..10f8e249161 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
@@ -198,21 +198,44 @@ public class PartitionBucketIndexHashingConfig implements 
Serializable {
    * Get Latest committed hashing config instant to load.
    * If instant is empty, then return latest hashing config instant
    */
-  public static Option<String> 
getHashingConfigInstantToLoad(HoodieTableMetaClient metaClient, Option<String> 
instant) {
+  public static Option<StoragePath> 
getHashingConfigToLoad(HoodieTableMetaClient metaClient, Option<String> 
instant) {
     try {
+      String basePath = metaClient.getBasePath().toString();
       List<String> allCommittedHashingConfig = 
getCommittedHashingConfigInstants(metaClient);
       if (instant.isPresent()) {
-        Option<String> res = 
getHashingConfigInstantToLoadBeforeOrOn(allCommittedHashingConfig, 
instant.get());
+        Option<StoragePath> res = 
getHashingConfigInstantToLoadBeforeOrOn(allCommittedHashingConfig, 
instant.get()).map(i -> {
+          return getHashingConfigPath(basePath, i);
+        });
         // fall back to look up archived hashing config instant before return 
empty
-        return res.isPresent() ? res : 
getHashingConfigInstantToLoadBeforeOrOn(getArchiveHashingConfigInstants(metaClient),
 instant.get());
+        return res.isPresent() ? res : 
getHashingConfigInstantToLoadBeforeOrOn(getArchiveHashingConfigInstants(metaClient),
 instant.get()).map(i -> {
+          return getArchiveHashingConfigPath(basePath, i);
+        });
       } else {
-        return 
Option.of(allCommittedHashingConfig.get(allCommittedHashingConfig.size() - 1));
+        return 
Option.of(allCommittedHashingConfig.get(allCommittedHashingConfig.size() - 
1)).map(i -> {
+          return getHashingConfigPath(basePath, i);
+        });
       }
     } catch (Exception e) {
       throw new HoodieException("Failed to get hashing config instant to 
load.", e);
     }
   }
 
+  public static List<PartitionBucketIndexHashingConfig> 
getAllHashingConfig(HoodieTableMetaClient metaClient) throws IOException {
+    String basePath = metaClient.getBasePath().toString();
+    List<StoragePath> allHashingConfig = 
getCommittedHashingConfigInstants(metaClient).stream().map(instant -> {
+      return getHashingConfigPath(basePath, instant);
+    }).collect(Collectors.toList());
+
+    if (metaClient.getStorage().exists(new 
StoragePath(metaClient.getArchiveHashingMetadataConfigPath()))) {
+      
allHashingConfig.addAll(getArchiveHashingConfigInstants(metaClient).stream().map(instant
 -> {
+        return getArchiveHashingConfigPath(basePath, instant);
+      }).collect(Collectors.toList()));
+    }
+    return allHashingConfig.stream().map(hashingConfigPath -> {
+      return loadHashingConfig(metaClient.getStorage(), hashingConfigPath);
+    }).filter(Option::isPresent).map(Option::get).collect(Collectors.toList());
+  }
+
   private static Option<String> 
getHashingConfigInstantToLoadBeforeOrOn(List<String> hashingConfigInstants, 
String instant) {
     List<String> res = 
hashingConfigInstants.stream().filter(hashingConfigInstant -> {
       return hashingConfigInstant.compareTo(instant) <= 0;
@@ -221,20 +244,19 @@ public class PartitionBucketIndexHashingConfig implements 
Serializable {
   }
 
   public static PartitionBucketIndexHashingConfig 
loadingLatestHashingConfig(HoodieTableMetaClient metaClient) {
-    Option<String> instantToLoad = getHashingConfigInstantToLoad(metaClient, 
Option.empty());
-    ValidationUtils.checkArgument(instantToLoad.isPresent(), "Can not load 
latest hashing config " + instantToLoad);
-    Option<PartitionBucketIndexHashingConfig> latestHashingConfig = 
loadHashingConfig(metaClient.getStorage(), 
getHashingConfigPath(metaClient.getBasePath().toString(), instantToLoad.get()));
-    ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not 
load latest hashing config " + instantToLoad);
+    Option<StoragePath> hashingConfigToLoad = 
getHashingConfigToLoad(metaClient, Option.empty());
+    ValidationUtils.checkArgument(hashingConfigToLoad.isPresent(), "Can not 
load latest hashing config " + hashingConfigToLoad);
+    Option<PartitionBucketIndexHashingConfig> latestHashingConfig = 
loadHashingConfig(metaClient.getStorage(), hashingConfigToLoad.get());
+    ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not 
load latest hashing config " + hashingConfigToLoad);
 
     return latestHashingConfig.get();
   }
 
   public static Option<PartitionBucketIndexHashingConfig> 
loadingLatestHashingConfigBeforeOrOn(HoodieTableMetaClient metaClient, String 
instant) {
-    Option<String> hashingConfigInstantToLoad = 
getHashingConfigInstantToLoad(metaClient, Option.of(instant));
-    if (hashingConfigInstantToLoad.isPresent()) {
-      Option<PartitionBucketIndexHashingConfig> latestHashingConfig = 
loadHashingConfig(metaClient.getStorage(),
-          getHashingConfigPath(metaClient.getBasePath().toString(), 
hashingConfigInstantToLoad.get()));
-      ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not 
load hashing config " + hashingConfigInstantToLoad + " based on " + instant);
+    Option<StoragePath> hashingConfigToLoad = 
getHashingConfigToLoad(metaClient, Option.of(instant));
+    if (hashingConfigToLoad.isPresent()) {
+      Option<PartitionBucketIndexHashingConfig> latestHashingConfig = 
loadHashingConfig(metaClient.getStorage(), hashingConfigToLoad.get());
+      ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not 
load hashing config " + hashingConfigToLoad + " based on " + instant);
       return latestHashingConfig;
     } else {
       return Option.empty();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
index f6b546cc4ab..927dbf85aa9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command.procedures
 import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, 
HoodieSparkSqlWriter}
 import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL, 
ENABLE_ROW_WRITER, OPERATION}
 import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieReaderConfig, SerializableSchema}
 import org.apache.hudi.common.engine.HoodieEngineContext
 import org.apache.hudi.common.fs.FSUtils
@@ -29,6 +30,7 @@ import org.apache.hudi.common.table.read.HoodieFileGroupReader
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.util.{Option, ValidationUtils}
 import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig}
+import org.apache.hudi.config.HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.index.bucket.partition.{PartitionBucketIndexCalculator, 
PartitionBucketIndexUtils}
 import org.apache.hudi.internal.schema.InternalSchema
@@ -61,9 +63,9 @@ class PartitionBucketIndexManager extends BaseProcedure
     ProcedureParameter.optional(1, "overwrite", DataTypes.StringType),
     ProcedureParameter.optional(2, "bucketNumber", DataTypes.IntegerType, -1),
     ProcedureParameter.optional(3, "add", DataTypes.StringType),
-    ProcedureParameter.optional(4, "dry-run", DataTypes.BooleanType, true),
+    ProcedureParameter.optional(4, "dryRun", DataTypes.BooleanType, true),
     ProcedureParameter.optional(5, "rollback", DataTypes.StringType),
-    ProcedureParameter.optional(6, "show-config", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(6, "showConfig", DataTypes.BooleanType, false),
     ProcedureParameter.optional(7, "rule", DataTypes.StringType, "regex"),
     // params => key=value, key2=value2
     ProcedureParameter.optional(8, "options", DataTypes.StringType)
@@ -117,11 +119,11 @@ class PartitionBucketIndexManager extends BaseProcedure
       if (showConfig) {
         handleShowConfig(metaClient)
       } else if (rollback != null) {
-        handleRollback(metaClient, rollback)
+        handleRollback(writeClient, metaClient, rollback)
       } else if (overwrite != null) {
         handleOverwrite(config, context, metaClient, overwrite, bucketNumber, 
rule, dryRun)
       } else if (add != null) {
-        handleAdd(metaClient, add, dryRun)
+        handleAdd(config, context, metaClient, add, dryRun)
       } else {
         Seq(Row("ERROR", "INVALID_OPERATION", "No valid operation specified"))
       }
@@ -179,76 +181,88 @@ class PartitionBucketIndexManager extends BaseProcedure
 
     // get partitions need to be rescaled
     val rescalePartitionsMap = 
getDifferentPartitions(partition2BucketWithNewHashingConfig.asScala, 
partition2BucketWithLatestHashingConfig.asScala)
-    val partitionsToRescale = rescalePartitionsMap.keys
-
-    // get all fileSlices need to read
-    val allFilesMap = FSUtils.getFilesInPartitions(context, 
metaClient.getStorage(), 
HoodieMetadataConfig.newBuilder.enable(mdtEnable).build,
-      metaClient.getBasePath.toString, partitionsToRescale.map(relative => {
-        new StoragePath(basePath, relative)
-      }).map(storagePath => storagePath.toString).toArray)
-    val files = allFilesMap.values().asScala.flatMap(x => x.asScala).toList
-    val view = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline, files.asJava)
-    val allFileSlice = partitionsToRescale.flatMap(partitionPath => {
-      view.getLatestFileSlices(partitionPath).iterator().asScala
-    }).toList
-
-    // read all fileSlice para and get DF
-    var tableSchemaWithMetaFields: Schema = null
-    try tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new 
TableSchemaResolver(metaClient).getTableAvroSchema(false), false)
-    catch {
-      case e: Exception =>
-        throw new HoodieException("Failed to get table schema during 
clustering", e)
-    }
+    if (dryRun) {
+      logInfo("Dry run OVERWRITE")
+      val rows = rescalePartitionsMap.map(entry => {
+        val details =
+          s"""
+             |${entry._1} => ${entry._2}
+             |""".stripMargin
+        details
+      }).toSeq
+      Seq(Row("SUCCESS", "DRY_RUN_OVERWRITE", s"""DETAILS:[$rows]"""))
+    } else {
+      logInfo("Perform OVERWRITE with dry-run disabled.")
+      val partitionsToRescale = rescalePartitionsMap.keys
+      // get all fileSlices need to read
+      val allFilesMap = FSUtils.getFilesInPartitions(context, 
metaClient.getStorage(), 
HoodieMetadataConfig.newBuilder.enable(mdtEnable).build,
+        metaClient.getBasePath.toString, partitionsToRescale.map(relative => {
+          new StoragePath(basePath, relative)
+        }).map(storagePath => storagePath.toString).toArray)
+      val files = allFilesMap.values().asScala.flatMap(x => x.asScala).toList
+      val view = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline, files.asJava)
+      val allFileSlice = partitionsToRescale.flatMap(partitionPath => {
+        view.getLatestFileSlices(partitionPath).iterator().asScala
+      }).toList
+
+      // read all fileSlice para and get DF
+      var tableSchemaWithMetaFields: Schema = null
+      try tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new 
TableSchemaResolver(metaClient).getTableAvroSchema(false), false)
+      catch {
+        case e: Exception =>
+          throw new HoodieException("Failed to get table schema during 
clustering", e)
+      }
 
-    // broadcast reader context.
-    val broadcastManager = new SparkBroadcastManager(context, metaClient)
-    broadcastManager.prepareAndBroadcast()
-    val sparkSchemaWithMetaFields = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields)
+      // broadcast reader context.
+      val broadcastManager = new SparkBroadcastManager(context, metaClient)
+      broadcastManager.prepareAndBroadcast()
+      val sparkSchemaWithMetaFields = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields)
 
-    val res: RDD[InternalRow] = if (allFileSlice.isEmpty) {
-      spark.sparkContext.emptyRDD
-    } else {
-      val serializableTableSchemaWithMetaFields = new 
SerializableSchema(tableSchemaWithMetaFields)
-      val latestInstantTime = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
-
-      spark.sparkContext.parallelize(allFileSlice, 
allFileSlice.size).flatMap(fileSlice => {
-        // instantiate other supporting cast
-        val readerSchema = serializableTableSchemaWithMetaFields.get
-        val readerContextOpt = 
broadcastManager.retrieveFileGroupReaderContext(basePath)
-        val internalSchemaOption: Option[InternalSchema] = Option.empty()
-        // instantiate FG reader
-        val fileGroupReader = new HoodieFileGroupReader(readerContextOpt.get(),
-          metaClient.getStorage,
-          basePath.toString,
-          latestInstantTime.requestedTime(),
-          fileSlice,
-          readerSchema,
-          readerSchema,
-          internalSchemaOption, // not support evolution of schema for now
-          metaClient,
-          metaClient.getTableConfig.getProps,
-          0,
-          java.lang.Long.MAX_VALUE,
-          HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.defaultValue(),
-          false)
-        fileGroupReader.initRecordIterators()
-        val iterator = 
fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]]
-        iterator.asScala
-      })
-    }
-    val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, 
res, sparkSchemaWithMetaFields)
-    logInfo("Start to do bucket rescale for " + rescalePartitionsMap)
-    val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(
-      sparkSession.sqlContext,
-      SaveMode.Append,
-      finalConfig,
-      dataFrame)
+      val res: RDD[InternalRow] = if (allFileSlice.isEmpty) {
+        spark.sparkContext.emptyRDD
+      } else {
+        val serializableTableSchemaWithMetaFields = new 
SerializableSchema(tableSchemaWithMetaFields)
+        val latestInstantTime = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+
+        spark.sparkContext.parallelize(allFileSlice, 
allFileSlice.size).flatMap(fileSlice => {
+          // instantiate other supporting cast
+          val readerSchema = serializableTableSchemaWithMetaFields.get
+          val readerContextOpt = 
broadcastManager.retrieveFileGroupReaderContext(basePath)
+          val internalSchemaOption: Option[InternalSchema] = Option.empty()
+          // instantiate FG reader
+          val fileGroupReader = new 
HoodieFileGroupReader(readerContextOpt.get(),
+            metaClient.getStorage,
+            basePath.toString,
+            latestInstantTime.requestedTime(),
+            fileSlice,
+            readerSchema,
+            readerSchema,
+            internalSchemaOption, // not support evolution of schema for now
+            metaClient,
+            metaClient.getTableConfig.getProps,
+            0,
+            java.lang.Long.MAX_VALUE,
+            HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.defaultValue(),
+            false)
+          fileGroupReader.initRecordIterators()
+          val iterator = 
fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]]
+          iterator.asScala
+        })
+      }
+      val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, 
res, sparkSchemaWithMetaFields)
+      logInfo("Start to do bucket rescale for " + rescalePartitionsMap)
+      val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(
+        sparkSession.sqlContext,
+        SaveMode.Append,
+        finalConfig,
+        dataFrame)
 
-    val details = s"Expression: $expression, Bucket Number: $bucketNumber, Dry 
Run: $dryRun"
+      val details = s"Expression: $expression, Bucket Number: $bucketNumber, 
Dry Run: $dryRun"
 
-    val archived = 
PartitionBucketIndexHashingConfig.archiveHashingConfigIfNecessary(metaClient)
+      val archived = 
PartitionBucketIndexHashingConfig.archiveHashingConfigIfNecessary(metaClient)
 
-    Seq(Row("SUCCESS", "OVERWRITE", details))
+      Seq(Row("SUCCESS", "OVERWRITE", details))
+    }
   }
 
   /**
@@ -270,39 +284,47 @@ class PartitionBucketIndexManager extends BaseProcedure
   /**
    * Handle the add operation.
    */
-  private def handleAdd(metaClient: HoodieTableMetaClient, expression: String, 
dryRun: Boolean): Seq[Row] = {
-    // In a real implementation, this would call PartitionBucketIndexManager
-    // For now, just return a placeholder result
-    val details = s"Expression: $expression, Dry Run: $dryRun"
-
-    // Here would be the actual call to PartitionBucketIndexManager
-    // PartitionBucketIndexManager.addExpression(metaClient, expression, 
dryRun)
-
-    Seq(Row("SUCCESS", "ADD", details))
+  private def handleAdd(config: Map[String, String],
+                        context: HoodieEngineContext,
+                        metaClient: HoodieTableMetaClient,
+                        expression: String,
+                        dryRun: Boolean): Seq[Row] = {
+    logInfo("Handle Add Expression Operation")
+
+    val hashingConfig = 
PartitionBucketIndexHashingConfig.loadingLatestHashingConfig(metaClient)
+    val latestExpression = hashingConfig.getExpressions
+
+    handleOverwrite(config, context, metaClient, 
s"""$expression;$latestExpression""", hashingConfig.getDefaultBucketNumber,
+      hashingConfig.getRule, dryRun)
   }
 
   /**
    * Handle the rollback operation.
    */
-  private def handleRollback(metaClient: HoodieTableMetaClient, instantTime: 
String): Seq[Row] = {
-    // In a real implementation, this would call PartitionBucketIndexManager
-    // For now, just return a placeholder result
-    val details = s"Rolled back bucket rescale action: $instantTime"
-
-    // Here would be the actual call to PartitionBucketIndexManager
-    // PartitionBucketIndexManager.rollback(metaClient, instantTime)
-
-    Seq(Row("SUCCESS", "ROLLBACK", details))
+  private def handleRollback(writeClient:  SparkRDDWriteClient[_], metaClient: 
HoodieTableMetaClient, instantTime: String): Seq[Row] = {
+    logInfo("Handle Add Expression Operation")
+    val hashingConfig = 
PartitionBucketIndexHashingConfig.loadHashingConfig(metaClient.getStorage, 
PartitionBucketIndexHashingConfig.getHashingConfigPath(metaClient.getBasePath.toString,
 instantTime))
+    if (hashingConfig.isPresent) {
+      logInfo("Start to rollback " + instantTime)
+      writeClient.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false")
+      
writeClient.getConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS,
 hashingConfig.get().getExpressions)
+      val result = writeClient.rollback(instantTime)
+      Seq(Row("SUCCESS", "ROLLBACK", s"""$result to rollback $instantTime"""))
+    } else {
+      Seq(Row("FAILED", "ROLLBACK", null))
+    }
   }
 
   /**
    * Handle the show-config operation.
    */
   private def handleShowConfig(metaClient: HoodieTableMetaClient): Seq[Row] = {
-
-
-
-    Seq(Row("SUCCESS", "SHOW_CONFIG", null))
+    logInfo("Handle showConfig operation")
+    val hashingConfigs = 
PartitionBucketIndexHashingConfig.getAllHashingConfig(metaClient)
+    val res = hashingConfigs.asScala.map(config => {
+      config.toString
+    }).mkString("\\n")
+    Seq(Row("SUCCESS", "SHOW_CONFIG", res))
   }
 
   override def build: Procedure = new PartitionBucketIndexManager
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala
index bca3d044617..6ae1a9c752f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala
@@ -19,12 +19,14 @@ package org.apache.spark.sql.hudi.dml
 
 import org.apache.hudi.common.model.{HoodieFailedWritesCleaningPolicy, 
PartitionBucketIndexHashingConfig}
 import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils
+import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase {
 
@@ -91,7 +93,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
               val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),2"
               val rule = "regex"
               val defaultBucketNumber = 1
-              spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+              spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
               checkAnswer(s"select id, name, price, ts, dt from $tableName")(
                 Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
@@ -132,6 +134,64 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Bucket Rescale Dry Run") {
+    withSQLConf(
+      "hoodie.datasource.write.operation" -> "bulk_insert",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "2") {
+      withTempDir { tmp =>
+        withTable(generateTableName) { tableName =>
+          val tablePath = s"""${tmp.getCanonicalPath}/$tableName"""
+          // Create a partitioned table
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  dt string,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | tblproperties (
+               | primaryKey = 'id,name',
+               | type = 'mor',
+               | preCombineField = 'ts',
+               | hoodie.index.type = 'BUCKET',
+               | hoodie.bucket.index.hash.field = 'id,name',
+               | hoodie.bucket.index.num.buckets = 1,
+               | hoodie.datasource.write.row.writer.enable = 'true')
+               | partitioned by (dt)
+               | location '${tablePath}'
+               | """.stripMargin)
+
+          // Note: Do not write the field alias, the partition field must be 
placed last.
+          spark.sql(
+            s"""
+               | insert into $tableName values
+               | (1, 'a1,1', 10, 1000, "2021-01-05"),
+               | (11, 'a1,1', 10, 1000, "2021-01-05"),
+               | (2, 'a2', 20, 2000, "2021-01-06"),
+               | (22, 'a2', 20, 2000, "2021-01-06")
+               | """.stripMargin)
+
+          // upgrade to partition level bucket index and rescale dt=2021-01-05 
from 1 to 2
+          val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-08),2"
+          val rule = "regex"
+          val defaultBucketNumber = 1
+          val sql = s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber)"
+          val resExpect = ArrayBuffer(
+            s"""
+               |dt=2021-01-05 => 2
+               |""".stripMargin)
+
+          checkAnswer(sql)(
+            Seq("SUCCESS", "DRY_RUN_OVERWRITE", s"""DETAILS:[$resExpect]""")
+          )
+
+          val metaClient = createMetaClient(spark, tablePath)
+          // check there is no active hashing config
+          assert(!metaClient.getStorage.exists(new 
StoragePath(metaClient.getHashingMetadataConfigPath)))
+    }}}}
+
   test("Test Bulk Insert Into Partition Bucket Index Table Without Rescale") {
     withSQLConf(
       "hoodie.datasource.write.operation" -> "bulk_insert",
@@ -176,7 +236,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
             val expressions = "dt=2021\\-01\\-07,2"
             val rule = "regex"
             val defaultBucketNumber = 1
-            spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+            spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
             checkAnswer(s"select id, name, price, ts, dt from $tableName")(
               Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
@@ -270,7 +330,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
             val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),2"
             val rule = "regex"
             val defaultBucketNumber = 1
-            spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+            spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
             checkAnswer(s"select id, name, price, ts, dt from $tableName")(
               Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
@@ -356,7 +416,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
           // do bucket rescale commit 2
           val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),4"
           val rule = "regex"
-          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
           // do commit 3 update id = 1111
           spark.sql(
@@ -368,7 +428,7 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
           // do bucket rescale commit 4
           val expressions2 = "dt=(2021\\-01\\-05|2021\\-01\\-07),3"
           val rule2 = "regex"
-          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions2', rule => '$rule2', bucketNumber => 
$defaultBucketNumber)")
+          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions2', rule => '$rule2', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
           // do commit 5 update id = 1111
           spark.sql(
@@ -456,10 +516,10 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
             val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),2"
             val rule = "regex"
             val defaultBucketNumber = 1
-            spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+            spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
             val expressions2 = "dt=(2021\\-01\\-05|2021\\-01\\-07),3"
-            spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions2', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+            spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions2', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
             // delete latest replace commit
             val replaceCommit = 
metaClient.getActiveTimeline.getCompletedReplaceTimeline.lastInstant().get()
@@ -547,25 +607,93 @@ class TestInsertTableWithPartitionBucketIndex extends 
HoodieSparkSqlTestBase {
           val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),2"
           val rule = "regex"
           val defaultBucketNumber = 1
-          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
           val expressions2 = "dt=(2021\\-01\\-05|2021\\-01\\-07),3"
-          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions2', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions2', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
           val expressions3 = "dt=(2021\\-01\\-05|2021\\-01\\-07),4"
-          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions3', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions3', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
           val expressions4 = "dt=(2021\\-01\\-05|2021\\-01\\-07),5"
-          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions4', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions4', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
           val expressions5 = "dt=(2021\\-01\\-05|2021\\-01\\-07),6"
-          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions5', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions5', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
           val expressions6 = "dt=(2021\\-01\\-05|2021\\-01\\-07),6"
-          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions6', rule => '$rule', bucketNumber => 
$defaultBucketNumber)")
+          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions6', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
 
           
assert(PartitionBucketIndexHashingConfig.getArchiveHashingConfigInstants(metaClient).size()
 == 2)
           
assert(PartitionBucketIndexHashingConfig.getCommittedHashingConfigInstants(metaClient).size()
 == 4)
+
+          // take care of showConfig command
+          val expected = 
PartitionBucketIndexHashingConfig.getAllHashingConfig(metaClient).asScala.map(config
 => {
+            config.toString
+          }).mkString("\\n")
+          checkAnswer(s"call partition_bucket_index_manager(table => 
'$tableName', showConfig => true)")(
+            Seq("SUCCESS", "SHOW_CONFIG", expected)
+          )
+        }
+      }
+    }
+  }
+
+  test("Test Add Expression and Rollback Command") {
+    withSQLConf(
+      "hoodie.datasource.write.operation" -> "upsert") {
+      withTempDir { tmp =>
+        withTable(generateTableName) { tableName =>
+          val tablePath = tmp.getCanonicalPath + "/" + tableName
+          // Create a partitioned table
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  dt string,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | tblproperties (
+               | primaryKey = 'id,name',
+               | type = 'cow',
+               | preCombineField = 'ts',
+               | hoodie.index.type = 'BUCKET',
+               | hoodie.bucket.index.hash.field = 'id,name',
+               | hoodie.bucket.index.num.buckets = 1)
+               | partitioned by (dt)
+               | location '$tablePath'
+               | """.stripMargin)
+
+          // Note: Do not write the field alias, the partition field must be 
placed last.
+          spark.sql(
+            s"""
+               | insert into $tableName values
+               | (1, 'a1,1', 10, 1000, "2021-01-05"),
+               | (11, 'a1,1', 10, 1000, "2021-01-05"),
+               | (2, 'a2', 20, 2000, "2021-01-06"),
+               | (22, 'a2', 20, 2000, "2021-01-06")
+               | """.stripMargin)
+          val metaClient = createMetaClient(spark, tablePath)
+
+          val expressions = "dt=(2021\\-01\\-07|2021\\-01\\-08),2"
+          val rule = "regex"
+          val defaultBucketNumber = 1
+          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => 
$defaultBucketNumber, dryRun => false)")
+
+          val expressions2 = "dt=(2021\\-01\\-09|2021\\-01\\-10),3"
+          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', add => '$expressions2', dryRun => false)")
+
+          val actualExpression = 
PartitionBucketIndexHashingConfig.loadingLatestHashingConfig(metaClient).getExpressions
+          val expectedExpression = s"""$expressions2;$expressions"""
+          assert(actualExpression.equals(expectedExpression.replace("\\","")))
+          val commit = 
metaClient.reloadActiveTimeline().getCompletedReplaceTimeline.lastInstant()
+
+          // rollback latest committed hashing config
+          spark.sql(s"call partition_bucket_index_manager(table => 
'$tableName', rollback => '${commit.get().requestedTime()}')")
+          val actualExpression2 = 
PartitionBucketIndexHashingConfig.loadingLatestHashingConfig(metaClient).getExpressions
+          assert(actualExpression2.equals(expressions.replace("\\","")))
         }
       }
     }


Reply via email to