yihua commented on code in PR #13229:
URL: https://github.com/apache/hudi/pull/13229#discussion_r2078706902
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -318,25 +320,33 @@ protected HoodieWriteMetadata<O> compact(HoodieTable<?,
I, ?, T> table, String c
}
compactionTimer = metrics.getCompactionCtx();
HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
- HoodieWriteMetadata<O> compactionMetadata =
convertToOutputMetadata(writeMetadata);
- if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
- completeCompaction(compactionMetadata.getCommitMetadata().get(), table,
compactionInstantTime);
+ HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(writeMetadata);
+ if (shouldComplete) {
+ commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
Review Comment:
Similar here
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -481,23 +508,15 @@ public HoodieWriteMetadata<O> cluster(String
clusteringInstant, boolean shouldCo
throw new HoodieClusteringException("Non clustering replace-commit
inflight at timestamp " + clusteringInstant);
}
}
+
clusteringTimer = metrics.getClusteringCtx();
LOG.info("Starting clustering at {} for table {}", clusteringInstant,
table.getConfig().getBasePath());
HoodieWriteMetadata<T> writeMetadata = table.cluster(context,
clusteringInstant);
HoodieWriteMetadata<O> clusteringMetadata =
convertToOutputMetadata(writeMetadata);
- // Publish file creation metrics for clustering.
- if (config.isMetricsOn()) {
- clusteringMetadata.getWriteStats()
- .ifPresent(hoodieWriteStats -> hoodieWriteStats.stream()
- .filter(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats() !=
null)
- .map(hoodieWriteStat ->
hoodieWriteStat.getRuntimeStats().getTotalCreateTime())
- .forEach(metrics::updateClusteringFileCreationMetrics));
- }
-
// TODO : Where is shouldComplete used ?
- if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
- completeClustering((HoodieReplaceCommitMetadata)
clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
+ if (shouldComplete) {
+ commitClustering(clusteringMetadata, table, clusteringInstant);
Review Comment:
Similarly, is `clusteringMetadata.getCommitMetadata()` always present?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -514,23 +533,65 @@ public boolean purgePendingClustering(String
clusteringInstant) {
return false;
}
+ protected abstract HoodieWriteMetadata<O>
convertToOutputMetadata(HoodieWriteMetadata<T> writeMetadata);
+
+ private Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieClusteringPlan clusteringPlan,
HoodieWriteMetadata<?> writeMetadata) {
+ Set<HoodieFileGroupId> newFilesWritten =
writeMetadata.getWriteStats().get().stream()
+ .map(s -> new HoodieFileGroupId(s.getPartitionPath(),
s.getFileId())).collect(Collectors.toSet());
+
+ return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
+ .filter(fg ->
"org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy"
Review Comment:
Is this Spark specific? It does not seem to fit in the base class here.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -481,23 +508,15 @@ public HoodieWriteMetadata<O> cluster(String
clusteringInstant, boolean shouldCo
throw new HoodieClusteringException("Non clustering replace-commit
inflight at timestamp " + clusteringInstant);
}
}
+
clusteringTimer = metrics.getClusteringCtx();
LOG.info("Starting clustering at {} for table {}", clusteringInstant,
table.getConfig().getBasePath());
HoodieWriteMetadata<T> writeMetadata = table.cluster(context,
clusteringInstant);
HoodieWriteMetadata<O> clusteringMetadata =
convertToOutputMetadata(writeMetadata);
- // Publish file creation metrics for clustering.
Review Comment:
Should this be kept?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -236,8 +238,8 @@ protected HoodieWriteMetadata<O> logCompact(String
logCompactionInstantTime, boo
WriteMarkersFactory.get(config.getMarkersType(), table,
logCompactionInstantTime);
HoodieWriteMetadata<T> writeMetadata = table.logCompact(context,
logCompactionInstantTime);
HoodieWriteMetadata<O> logCompactionMetadata =
convertToOutputMetadata(writeMetadata);
- if (shouldComplete &&
logCompactionMetadata.getCommitMetadata().isPresent()) {
Review Comment:
Is `logCompactionMetadata.getCommitMetadata()` always present now?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -369,6 +379,23 @@ protected void completeCompaction(HoodieCommitMetadata
metadata, HoodieTable tab
LOG.info("Compacted successfully on commit {}", compactionCommitTime);
}
+ public void commitLogCompaction(String compactionInstantTime,
HoodieWriteMetadata<O> writeMetadata, Option<HoodieTable> tableOpt) {
+ // dereferencing the write dag for log compaction for the first time.
+ List<HoodieWriteStat> writeStats =
triggerWritesAndFetchWriteStats(writeMetadata);
Review Comment:
`triggerWritesAndFetchWriteStats` should not be engine-specific. Should it
be implemented based on `HoodieData`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -236,8 +238,8 @@ protected HoodieWriteMetadata<O> logCompact(String
logCompactionInstantTime, boo
WriteMarkersFactory.get(config.getMarkersType(), table,
logCompactionInstantTime);
HoodieWriteMetadata<T> writeMetadata = table.logCompact(context,
logCompactionInstantTime);
HoodieWriteMetadata<O> logCompactionMetadata =
convertToOutputMetadata(writeMetadata);
- if (shouldComplete &&
logCompactionMetadata.getCommitMetadata().isPresent()) {
- completeLogCompaction(logCompactionMetadata.getCommitMetadata().get(),
table, logCompactionInstantTime);
+ if (shouldComplete) {
+ commitLogCompaction(logCompactionInstantTime, logCompactionMetadata,
Option.of(table));
Review Comment:
And is this the bug fixed?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -369,6 +379,23 @@ protected void completeCompaction(HoodieCommitMetadata
metadata, HoodieTable tab
LOG.info("Compacted successfully on commit {}", compactionCommitTime);
}
+ public void commitLogCompaction(String compactionInstantTime,
HoodieWriteMetadata<O> writeMetadata, Option<HoodieTable> tableOpt) {
Review Comment:
This method looks very similar to `#commitCompaction`. Could we extract the
common part?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]