xushiyan commented on a change in pull request #4489:
URL: https://github.com/apache/hudi/pull/4489#discussion_r780645945



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
##########
@@ -42,27 +46,49 @@
 public class SparkDeletePartitionCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
     extends SparkInsertOverwriteCommitActionExecutor<T> {
 
-  private List<String> partitions;
+  private final List<String> partitions;
+  private final boolean purge;
   public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
                                                   HoodieWriteConfig config, 
HoodieTable table,
-                                                  String instantTime, 
List<String> partitions) {
-    super(context, config, table, instantTime,null, 
WriteOperationType.DELETE_PARTITION);
+                                                  String instantTime, 
List<String> partitions,
+                                                  boolean purge) {
+    super(context, config, table, instantTime, null, 
WriteOperationType.DELETE_PARTITION);
     this.partitions = partitions;
+    this.purge = purge;
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-    HoodieTimer timer = new HoodieTimer().startTimer();
-    Map<String, List<String>> partitionToReplaceFileIds = 
jsc.parallelize(partitions, partitions.size()).distinct()
-        .mapToPair(partitionPath -> new Tuple2<>(partitionPath, 
getAllExistingFileIds(partitionPath))).collectAsMap();
-    HoodieWriteMetadata result = new HoodieWriteMetadata();
-    result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
-    result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
+    try {
+      JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+      HoodieTimer timer = new HoodieTimer().startTimer();
+      Map<String, List<String>> partitionToReplaceFileIds = 
jsc.parallelize(partitions, partitions.size()).distinct()
+          .mapToPair(partitionPath -> new Tuple2<>(partitionPath, 
getAllExistingFileIds(partitionPath))).collectAsMap();
+      HoodieWriteMetadata result = new HoodieWriteMetadata();
+      result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
+      result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
 
-    result.setWriteStatuses(jsc.emptyRDD());
-    this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new 
HashMap<>(), new WorkloadStat())), instantTime);
-    this.commitOnAutoCommit(result);
-    return result;
+      // delete partition's path

Review comment:
       this comment is redundant

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -109,15 +111,31 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
             : hoodieWriteStat.getTotalWriteBytes();
         newFiles.put(filename, totalWriteBytes);
       });
+
       // New files added to a partition
-      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
-          partition, Option.of(newFiles), Option.empty());
-      records.add(record);
+      if (!newFiles.isEmpty()) {
+        HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
+                partition, Option.of(newFiles), Option.empty());
+        records.add(record);
+      }
     });
 
-    // New partitions created
-    HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new 
ArrayList<>(allPartitions));
-    records.add(record);
+    // Add delete partition's record
+    if (commitMetadata instanceof HoodieReplaceCommitMetadata
+            && 
WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) {

Review comment:
       can this refactoring be addressed somehow?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
##########
@@ -42,27 +46,49 @@
 public class SparkDeletePartitionCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
     extends SparkInsertOverwriteCommitActionExecutor<T> {
 
-  private List<String> partitions;
+  private final List<String> partitions;
+  private final boolean purge;
   public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
                                                   HoodieWriteConfig config, 
HoodieTable table,
-                                                  String instantTime, 
List<String> partitions) {
-    super(context, config, table, instantTime,null, 
WriteOperationType.DELETE_PARTITION);
+                                                  String instantTime, 
List<String> partitions,
+                                                  boolean purge) {
+    super(context, config, table, instantTime, null, 
WriteOperationType.DELETE_PARTITION);
     this.partitions = partitions;
+    this.purge = purge;
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-    HoodieTimer timer = new HoodieTimer().startTimer();
-    Map<String, List<String>> partitionToReplaceFileIds = 
jsc.parallelize(partitions, partitions.size()).distinct()
-        .mapToPair(partitionPath -> new Tuple2<>(partitionPath, 
getAllExistingFileIds(partitionPath))).collectAsMap();
-    HoodieWriteMetadata result = new HoodieWriteMetadata();
-    result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
-    result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
+    try {
+      JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+      HoodieTimer timer = new HoodieTimer().startTimer();
+      Map<String, List<String>> partitionToReplaceFileIds = 
jsc.parallelize(partitions, partitions.size()).distinct()
+          .mapToPair(partitionPath -> new Tuple2<>(partitionPath, 
getAllExistingFileIds(partitionPath))).collectAsMap();
+      HoodieWriteMetadata result = new HoodieWriteMetadata();
+      result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
+      result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
 
-    result.setWriteStatuses(jsc.emptyRDD());
-    this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new 
HashMap<>(), new WorkloadStat())), instantTime);
-    this.commitOnAutoCommit(result);
-    return result;
+      // delete partition's path
+      if (purge) {
+        deletePartitionsPath();
+      }
+
+      result.setWriteStatuses(jsc.emptyRDD());
+      this.saveWorkloadProfileMetadataToInflight(new 
WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
+      this.commitOnAutoCommit(result);
+      return result;
+    } catch (Exception e) {
+      throw new HoodieDeletePartitionException("Failed to execute delete 
partitions for commit time " + instantTime, e);
+    }
+  }
+
+  private void deletePartitionsPath() throws IOException {
+    String basePath = table.getMetaClient().getBasePath();
+    for (String partition : partitions) {
+      Path fullPartitionPath = FSUtils.getPartitionPath(basePath, partition);
+      if (table.getMetaClient().getFs().exists(fullPartitionPath)) {
+        table.getMetaClient().getFs().delete(fullPartitionPath, true);

Review comment:
       tricky thing is: can we even roll back purge delete partitions? shall we 
make an exception for making delete partition irreversible?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
##########
@@ -42,27 +46,49 @@
 public class SparkDeletePartitionCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
     extends SparkInsertOverwriteCommitActionExecutor<T> {
 
-  private List<String> partitions;
+  private final List<String> partitions;
+  private final boolean purge;
   public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
                                                   HoodieWriteConfig config, 
HoodieTable table,
-                                                  String instantTime, 
List<String> partitions) {
-    super(context, config, table, instantTime,null, 
WriteOperationType.DELETE_PARTITION);
+                                                  String instantTime, 
List<String> partitions,
+                                                  boolean purge) {
+    super(context, config, table, instantTime, null, 
WriteOperationType.DELETE_PARTITION);
     this.partitions = partitions;
+    this.purge = purge;
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-    HoodieTimer timer = new HoodieTimer().startTimer();
-    Map<String, List<String>> partitionToReplaceFileIds = 
jsc.parallelize(partitions, partitions.size()).distinct()
-        .mapToPair(partitionPath -> new Tuple2<>(partitionPath, 
getAllExistingFileIds(partitionPath))).collectAsMap();
-    HoodieWriteMetadata result = new HoodieWriteMetadata();
-    result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
-    result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
+    try {
+      JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+      HoodieTimer timer = new HoodieTimer().startTimer();
+      Map<String, List<String>> partitionToReplaceFileIds = 
jsc.parallelize(partitions, partitions.size()).distinct()
+          .mapToPair(partitionPath -> new Tuple2<>(partitionPath, 
getAllExistingFileIds(partitionPath))).collectAsMap();
+      HoodieWriteMetadata result = new HoodieWriteMetadata();
+      result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
+      result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
 
-    result.setWriteStatuses(jsc.emptyRDD());
-    this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new 
HashMap<>(), new WorkloadStat())), instantTime);
-    this.commitOnAutoCommit(result);
-    return result;
+      // delete partition's path
+      if (purge) {
+        deletePartitionsPath();
+      }
+
+      result.setWriteStatuses(jsc.emptyRDD());
+      this.saveWorkloadProfileMetadataToInflight(new 
WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
+      this.commitOnAutoCommit(result);
+      return result;
+    } catch (Exception e) {
+      throw new HoodieDeletePartitionException("Failed to execute delete 
partitions for commit time " + instantTime, e);
+    }
+  }
+
+  private void deletePartitionsPath() throws IOException {

Review comment:
       ```suggestion
     private void deletePartitionsPaths() throws IOException {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to