nsivabalan commented on code in PR #13976:
URL: https://github.com/apache/hudi/pull/13976#discussion_r2389726751


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java:
##########
@@ -87,12 +97,24 @@ public void commitToMetadataTable(HoodieTable table,
   private HoodieData<WriteStatus> 
streamWriteToMetadataTable(HoodieData<WriteStatus> dataTableWriteStatuses,
                                                              
HoodieTableMetadataWriter metadataWriter,
                                                              HoodieTable table,
-                                                             String 
instantTime) {
-    HoodieData<WriteStatus> allWriteStatus = dataTableWriteStatuses;
+                                                             String 
instantTime,
+                                                             boolean 
enforceCoalesceWithRepartition,
+                                                             int 
coalesceDividentForDataTableWrites) {
     HoodieData<WriteStatus> mdtWriteStatuses = 
metadataWriter.streamWriteToMetadataPartitions(dataTableWriteStatuses, 
instantTime);
-    allWriteStatus = allWriteStatus.union(mdtWriteStatuses);
-    allWriteStatus.persist("MEMORY_AND_DISK_SER", table.getContext(), 
HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(),
 instantTime));
-    return allWriteStatus;
+    mdtWriteStatuses.persist("MEMORY_AND_DISK_SER", table.getContext(), 
HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(),
 instantTime));
+    HoodieData<WriteStatus> coalescedDataWriteStatuses;
+    int coalesceParallelism = Math.max(1, 
dataTableWriteStatuses.getNumPartitions() / coalesceDividentForDataTableWrites);
+    if (enforceCoalesceWithRepartition) {
+      // with bulk insert and NONE sort mode, simple coalesce on datatable 
write statuses also impact record key generation stages.
+      // and hence we are adding a partitioner to cut the chain so that 
coalesce(1) here does not impact record key generation stages.
+      coalescedDataWriteStatuses = 
HoodieJavaRDD.of(HoodieJavaRDD.getJavaRDD(dataTableWriteStatuses)
+          .mapToPair((PairFunction<WriteStatus, Boolean, WriteStatus>) 
writeStatus -> new Tuple2(true, writeStatus))

Review Comment:
   not sure I get your question. 
   We are just doing a hashCode() of the entry within CoalescingPartitioner 
which is `WriteStatus` in this case. 
   why do we need a new key. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to