nsivabalan commented on code in PR #13229:
URL: https://github.com/apache/hudi/pull/13229#discussion_r2072399706


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -318,25 +326,41 @@ protected HoodieWriteMetadata<O> compact(HoodieTable<?, 
I, ?, T> table, String c
     }
     compactionTimer = metrics.getCompactionCtx();
     HoodieWriteMetadata<T> writeMetadata = table.compact(context, 
compactionInstantTime);
-    HoodieWriteMetadata<O> compactionMetadata = 
convertToOutputMetadata(writeMetadata);
-    if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
-      completeCompaction(compactionMetadata.getCommitMetadata().get(), table, 
compactionInstantTime);
+    HoodieWriteMetadata<O> compactionWriteMetadata = 
convertToOutputMetadata(writeMetadata);
+    if (shouldComplete) {
+      commitCompaction(compactionInstantTime, compactionWriteMetadata, 
Option.of(table));
     }
-    return compactionMetadata;
+    return compactionWriteMetadata;
   }
 
-  /**
-   * Commit a compaction operation. Allow passing additional meta-data to be 
stored in commit instant file.
-   *
-   * @param compactionInstantTime Compaction Instant Time
-   * @param metadata              All the metadata that gets stored along with 
a commit
-   * @param extraMetadata         Extra Metadata to be stored
-   */
-  public void commitCompaction(String compactionInstantTime, 
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
-    extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
-    completeCompaction(metadata, createTable(config, 
context.getStorageConf()), compactionInstantTime);
+  public void commitCompaction(String compactionInstantTime, 
HoodieWriteMetadata<O> compactionWriteMetadata, Option<HoodieTable> tableOpt) {
+    // dereferencing the write dag for compaction for the first time.
+    List<HoodieWriteStat> writeStats = 
triggerWritesAndFetchWriteStats(compactionWriteMetadata);
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(true);
+    for (HoodieWriteStat stat : writeStats) {
+      commitMetadata.addWriteStat(stat.getPartitionPath(), stat);
+    }
+    commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
config.getSchema());
+    HoodieTable table = tableOpt.orElseGet(() -> createTable(config, 
context.getStorageConf()));
+    Pair<Option<String>, Option<String>> schemaPair = InternalSchemaCache
+        
.getInternalSchemaAndAvroSchemaForClusteringAndCompaction(table.getMetaClient(),
 compactionInstantTime);
+
+    if (schemaPair.getLeft().isPresent()) {
+      commitMetadata.addMetadata(SerDeHelper.LATEST_SCHEMA, 
schemaPair.getLeft().get());
+      commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
schemaPair.getRight().get());

Review Comment:
   w/ this patch, we are moving entire HoodieCommitMetadata preparation from 
internal layers (RunCompactionActionExecutor) to commitCompaction methods in 
BaseHoodieTableServiceClient. 
   Since we are not triggering the dag as such, we can only party populate 
HoodieWriteMetadata within RunCompactionActionExecutor, and we have to wait 
until we reach commitCompaction method in BaseHoodieTableServiceClient to set 
the WriteStatuses and HoodieWriteStat. 
   
   I felt, instead of having it split across two places, just move all of them 
to one place where we actually need it. 
   
   We have taken similar approach for all 3 table services (clustering, 
compaction and log compaction) 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to