xushiyan commented on a change in pull request #4856:
URL: https://github.com/apache/hudi/pull/4856#discussion_r817704373



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
##########
@@ -48,69 +45,62 @@
  * @param <T>
  */
 @SuppressWarnings("checkstyle:LineLength")
-public class SparkDeleteHelper<T extends HoodieRecordPayload,R> extends
-    BaseDeleteHelper<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, R> {
-  private SparkDeleteHelper() {
+public class HoodieDeleteHelper<T extends HoodieRecordPayload, R> extends
+    BaseDeleteHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>, R> {
+  private HoodieDeleteHelper() {
   }
 
   private static class DeleteHelperHolder {
-    private static final SparkDeleteHelper SPARK_DELETE_HELPER = new 
SparkDeleteHelper();
+    private static final HoodieDeleteHelper HOODIE_DELETE_HELPER = new 
HoodieDeleteHelper<>();
   }
 
-  public static SparkDeleteHelper newInstance() {
-    return DeleteHelperHolder.SPARK_DELETE_HELPER;
+  public static HoodieDeleteHelper newInstance() {
+    return DeleteHelperHolder.HOODIE_DELETE_HELPER;
   }
 
   @Override
-  public JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, 
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> table, int parallelism) {
+  public HoodieData<HoodieKey> deduplicateKeys(HoodieData<HoodieKey> keys, 
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table, int parallelism) {
     boolean isIndexingGlobal = table.getIndex().isGlobal();
     if (isIndexingGlobal) {
-      return keys.keyBy(HoodieKey::getRecordKey)
-          .reduceByKey((key1, key2) -> key1, parallelism)
-          .values();
+      return keys.distinctWithKey(HoodieKey::getRecordKey, parallelism);
     } else {
-      return keys.distinct(parallelism);
+      return keys.distinct();
     }
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(String instantTime,
-                                                           JavaRDD<HoodieKey> 
keys,
-                                                           HoodieEngineContext 
context,
-                                                           HoodieWriteConfig 
config,
-                                                           HoodieTable<T, 
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                                           
BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, R> deleteExecutor) {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(String 
instantTime,
+                                                              
HoodieData<HoodieKey> keys,
+                                                              
HoodieEngineContext context,
+                                                              
HoodieWriteConfig config,
+                                                              HoodieTable<T, 
HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> 
table,
+                                                              
BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>, R> deleteExecutor) {
     try {
-      HoodieWriteMetadata result = null;
-      JavaRDD<HoodieKey> dedupedKeys = keys;
+      HoodieData<HoodieKey> dedupedKeys = keys;
       final int parallelism = config.getDeleteShuffleParallelism();
       if (config.shouldCombineBeforeDelete()) {
         // De-dupe/merge if needed
         dedupedKeys = deduplicateKeys(keys, table, parallelism);
-      } else if (!keys.partitions().isEmpty()) {
-        dedupedKeys = keys.repartition(parallelism);

Review comment:
       this was spark-specific logic; need to discuss the implication




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to