yihua commented on code in PR #13229:
URL: https://github.com/apache/hudi/pull/13229#discussion_r2078706902


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -318,25 +320,33 @@ protected HoodieWriteMetadata<O> compact(HoodieTable<?, 
I, ?, T> table, String c
     }
     compactionTimer = metrics.getCompactionCtx();
     HoodieWriteMetadata<T> writeMetadata = table.compact(context, 
compactionInstantTime);
-    HoodieWriteMetadata<O> compactionMetadata = 
convertToOutputMetadata(writeMetadata);
-    if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
-      completeCompaction(compactionMetadata.getCommitMetadata().get(), table, 
compactionInstantTime);
+    HoodieWriteMetadata<O> compactionWriteMetadata = 
convertToOutputMetadata(writeMetadata);
+    if (shouldComplete) {
+      commitCompaction(compactionInstantTime, compactionWriteMetadata, 
Option.of(table));

Review Comment:
   Similar here



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -481,23 +508,15 @@ public HoodieWriteMetadata<O> cluster(String 
clusteringInstant, boolean shouldCo
         throw new HoodieClusteringException("Non clustering replace-commit 
inflight at timestamp " + clusteringInstant);
       }
     }
+
     clusteringTimer = metrics.getClusteringCtx();
     LOG.info("Starting clustering at {} for table {}", clusteringInstant, 
table.getConfig().getBasePath());
     HoodieWriteMetadata<T> writeMetadata = table.cluster(context, 
clusteringInstant);
     HoodieWriteMetadata<O> clusteringMetadata = 
convertToOutputMetadata(writeMetadata);
 
-    // Publish file creation metrics for clustering.
-    if (config.isMetricsOn()) {
-      clusteringMetadata.getWriteStats()
-          .ifPresent(hoodieWriteStats -> hoodieWriteStats.stream()
-              .filter(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats() != 
null)
-              .map(hoodieWriteStat -> 
hoodieWriteStat.getRuntimeStats().getTotalCreateTime())
-              .forEach(metrics::updateClusteringFileCreationMetrics));
-    }
-
     // TODO : Where is shouldComplete used ?
-    if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
-      completeClustering((HoodieReplaceCommitMetadata) 
clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
+    if (shouldComplete) {
+      commitClustering(clusteringMetadata, table, clusteringInstant);

Review Comment:
   Similarly, is `clusteringMetadata.getCommitMetadata()` always present?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -514,23 +533,65 @@ public boolean purgePendingClustering(String 
clusteringInstant) {
     return false;
   }
 
+  protected abstract HoodieWriteMetadata<O> 
convertToOutputMetadata(HoodieWriteMetadata<T> writeMetadata);
+
+  private Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieClusteringPlan clusteringPlan, 
HoodieWriteMetadata<?> writeMetadata) {
+    Set<HoodieFileGroupId> newFilesWritten = 
writeMetadata.getWriteStats().get().stream()
+        .map(s -> new HoodieFileGroupId(s.getPartitionPath(), 
s.getFileId())).collect(Collectors.toSet());
+
+    return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
+        .filter(fg -> 
"org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy"

Review Comment:
   Is this Spark specific?  It does not seem to fit in the base class here.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -481,23 +508,15 @@ public HoodieWriteMetadata<O> cluster(String 
clusteringInstant, boolean shouldCo
         throw new HoodieClusteringException("Non clustering replace-commit 
inflight at timestamp " + clusteringInstant);
       }
     }
+
     clusteringTimer = metrics.getClusteringCtx();
     LOG.info("Starting clustering at {} for table {}", clusteringInstant, 
table.getConfig().getBasePath());
     HoodieWriteMetadata<T> writeMetadata = table.cluster(context, 
clusteringInstant);
     HoodieWriteMetadata<O> clusteringMetadata = 
convertToOutputMetadata(writeMetadata);
 
-    // Publish file creation metrics for clustering.

Review Comment:
   Should this be kept?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -236,8 +238,8 @@ protected HoodieWriteMetadata<O> logCompact(String 
logCompactionInstantTime, boo
     WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionInstantTime);
     HoodieWriteMetadata<T> writeMetadata = table.logCompact(context, 
logCompactionInstantTime);
     HoodieWriteMetadata<O> logCompactionMetadata = 
convertToOutputMetadata(writeMetadata);
-    if (shouldComplete && 
logCompactionMetadata.getCommitMetadata().isPresent()) {

Review Comment:
   Is `logCompactionMetadata.getCommitMetadata()` always present now?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -369,6 +379,23 @@ protected void completeCompaction(HoodieCommitMetadata 
metadata, HoodieTable tab
     LOG.info("Compacted successfully on commit {}", compactionCommitTime);
   }
 
+  public void commitLogCompaction(String compactionInstantTime, 
HoodieWriteMetadata<O> writeMetadata, Option<HoodieTable> tableOpt) {
+    // dereferencing the write dag for log compaction for the first time.
+    List<HoodieWriteStat> writeStats = 
triggerWritesAndFetchWriteStats(writeMetadata);

Review Comment:
   `triggerWritesAndFetchWriteStats` should not be engine-specific.  Should it 
be implemented based on `HoodieData`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -236,8 +238,8 @@ protected HoodieWriteMetadata<O> logCompact(String 
logCompactionInstantTime, boo
     WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionInstantTime);
     HoodieWriteMetadata<T> writeMetadata = table.logCompact(context, 
logCompactionInstantTime);
     HoodieWriteMetadata<O> logCompactionMetadata = 
convertToOutputMetadata(writeMetadata);
-    if (shouldComplete && 
logCompactionMetadata.getCommitMetadata().isPresent()) {
-      completeLogCompaction(logCompactionMetadata.getCommitMetadata().get(), 
table, logCompactionInstantTime);
+    if (shouldComplete) {
+      commitLogCompaction(logCompactionInstantTime, logCompactionMetadata, 
Option.of(table));

Review Comment:
   And is this the bug fixed?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -369,6 +379,23 @@ protected void completeCompaction(HoodieCommitMetadata 
metadata, HoodieTable tab
     LOG.info("Compacted successfully on commit {}", compactionCommitTime);
   }
 
+  public void commitLogCompaction(String compactionInstantTime, 
HoodieWriteMetadata<O> writeMetadata, Option<HoodieTable> tableOpt) {

Review Comment:
   This method looks very similar to `#commitCompaction`. Could we extract the 
common part?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to