kbuci commented on code in PR #18306:
URL: https://github.com/apache/hudi/pull/18306#discussion_r2998503660
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java:
##########
@@ -236,6 +236,93 @@ public void testLogCompactionOnMORTable() throws Exception
{
storageConf, Arrays.asList(dataGen.getPartitionPaths()));
}
+ /**
+ * Verify that log compaction is only scheduled when the number of delta
commits since the last
+ * compaction or log compaction meets or exceeds the
LogCompactionBlocksThreshold, and that
+ * the counter resets after each log compaction.
+ */
+ @Test
+ public void testLogCompactionSchedulingRespectsThreshold() throws Exception {
+ int logCompactionThreshold = 3;
Review Comment:
Sure, removed FT and instead added a unit tests with mocks
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +367,67 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLa
}
}
+ /**
+ * Returns the most recent log compaction instant from the raw active
timeline.
+ *
+ * @param rawActiveTimeline Active timeline of table, that has current and
previous states of each instant.
+ * @return The latest log compaction instant, or empty if no log compaction
is present.
+ */
+ public static Option<HoodieInstant> getLastLogCompaction(final
HoodieActiveTimeline rawActiveTimeline) {
+ Option<HoodieInstant> lastLogCompactionInstantOption =
Option.fromJavaOptional(
+ rawActiveTimeline
+ .filterPendingLogCompactionTimeline()
+ .getReverseOrderedInstants()
+ .findFirst()
+ );
+ if (!lastLogCompactionInstantOption.isPresent()) {
+ return Option.empty();
+ }
+ String logCompactionTimestamp =
lastLogCompactionInstantOption.get().requestedTime();
+ Option<HoodieInstant> completedInstant = Option.fromJavaOptional(
+ rawActiveTimeline
+ .findInstantsInClosedRange(logCompactionTimestamp,
logCompactionTimestamp)
+ .getInstantsAsStream()
+ .filter(HoodieInstant::isCompleted)
+ .findFirst()
+ );
+ return completedInstant.isPresent() ? completedInstant :
lastLogCompactionInstantOption;
+ }
+
+ /**
+ * Returns a pair of (timeline containing the delta commits after the latest
completed
+ * log compaction delta commit, the completed log compaction commit
instant), if the latest completed
+ * log compaction commit is present; a pair of (timeline containing all the
delta commits,
+ * the first delta commit instant), if there is no completed log compaction
commit.
+ *
+ * @param deltaCommitTimeline Active timeline of table that contains only
delta commits.
+ * @param rawActiveTimeline Active timeline of table, that has current and
previous states of each instant.
+ * @return Pair of timeline containing delta commits and an instant.
+ */
+ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLatestCompletedLogCompaction(
+ final HoodieTimeline deltaCommitTimeline,
+ final HoodieActiveTimeline rawActiveTimeline) {
+ Option<HoodieInstant> lastLogCompactionOption =
getLastLogCompaction(rawActiveTimeline);
Review Comment:
Sure updated names
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]