nsivabalan commented on code in PR #13976:
URL: https://github.com/apache/hudi/pull/13976#discussion_r2389726751
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java:
##########
@@ -87,12 +97,24 @@ public void commitToMetadataTable(HoodieTable table,
private HoodieData<WriteStatus>
streamWriteToMetadataTable(HoodieData<WriteStatus> dataTableWriteStatuses,
HoodieTableMetadataWriter metadataWriter,
HoodieTable table,
- String
instantTime) {
- HoodieData<WriteStatus> allWriteStatus = dataTableWriteStatuses;
+ String
instantTime,
+ boolean
enforceCoalesceWithRepartition,
+ int
coalesceDividentForDataTableWrites) {
HoodieData<WriteStatus> mdtWriteStatuses =
metadataWriter.streamWriteToMetadataPartitions(dataTableWriteStatuses,
instantTime);
- allWriteStatus = allWriteStatus.union(mdtWriteStatuses);
- allWriteStatus.persist("MEMORY_AND_DISK_SER", table.getContext(),
HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(),
instantTime));
- return allWriteStatus;
+ mdtWriteStatuses.persist("MEMORY_AND_DISK_SER", table.getContext(),
HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(),
instantTime));
+ HoodieData<WriteStatus> coalescedDataWriteStatuses;
+ int coalesceParallelism = Math.max(1,
dataTableWriteStatuses.getNumPartitions() / coalesceDividentForDataTableWrites);
+ if (enforceCoalesceWithRepartition) {
+ // with bulk insert and NONE sort mode, simple coalesce on datatable
write statuses also impact record key generation stages.
+ // and hence we are adding a partitioner to cut the chain so that
coalesce(1) here does not impact record key generation stages.
+ coalescedDataWriteStatuses =
HoodieJavaRDD.of(HoodieJavaRDD.getJavaRDD(dataTableWriteStatuses)
+ .mapToPair((PairFunction<WriteStatus, Boolean, WriteStatus>)
writeStatus -> new Tuple2(true, writeStatus))
Review Comment:
not sure I get your question.
We are just doing a hashCode() of the entry within CoalescingPartitioner
which is `WriteStatus` in this case.
why do we need a new key.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]