nsivabalan commented on code in PR #13229:
URL: https://github.com/apache/hudi/pull/13229#discussion_r2072399706
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -318,25 +326,41 @@ protected HoodieWriteMetadata<O> compact(HoodieTable<?,
I, ?, T> table, String c
}
compactionTimer = metrics.getCompactionCtx();
HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
- HoodieWriteMetadata<O> compactionMetadata =
convertToOutputMetadata(writeMetadata);
- if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
- completeCompaction(compactionMetadata.getCommitMetadata().get(), table,
compactionInstantTime);
+ HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(writeMetadata);
+ if (shouldComplete) {
+ commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
}
- return compactionMetadata;
+ return compactionWriteMetadata;
}
- /**
- * Commit a compaction operation. Allow passing additional meta-data to be
stored in commit instant file.
- *
- * @param compactionInstantTime Compaction Instant Time
- * @param metadata All the metadata that gets stored along with
a commit
- * @param extraMetadata Extra Metadata to be stored
- */
- public void commitCompaction(String compactionInstantTime,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
- extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
- completeCompaction(metadata, createTable(config,
context.getStorageConf()), compactionInstantTime);
+ public void commitCompaction(String compactionInstantTime,
HoodieWriteMetadata<O> compactionWriteMetadata, Option<HoodieTable> tableOpt) {
+ // dereferencing the write dag for compaction for the first time.
+ List<HoodieWriteStat> writeStats =
triggerWritesAndFetchWriteStats(compactionWriteMetadata);
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(true);
+ for (HoodieWriteStat stat : writeStats) {
+ commitMetadata.addWriteStat(stat.getPartitionPath(), stat);
+ }
+ commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
config.getSchema());
+ HoodieTable table = tableOpt.orElseGet(() -> createTable(config,
context.getStorageConf()));
+ Pair<Option<String>, Option<String>> schemaPair = InternalSchemaCache
+
.getInternalSchemaAndAvroSchemaForClusteringAndCompaction(table.getMetaClient(),
compactionInstantTime);
+
+ if (schemaPair.getLeft().isPresent()) {
+ commitMetadata.addMetadata(SerDeHelper.LATEST_SCHEMA,
schemaPair.getLeft().get());
+ commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
schemaPair.getRight().get());
Review Comment:
w/ this patch, we are moving entire HoodieCommitMetadata preparation from
internal layers (RunCompactionActionExecutor) to commitCompaction methods in
BaseHoodieTableServiceClient.
Since we are not triggering the dag as such, we can only party populate
HoodieWriteMetadata within RunCompactionActionExecutor, and we have to wait
until we reach commitCompaction method in BaseHoodieTableServiceClient to set
the WriteStatuses and HoodieWriteStat.
I felt, instead of having it split across two places, just move all of them
to one place where we actually need it.
We have taken similar approach for all 3 table services (clustering,
compaction and log compaction)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]