nsivabalan commented on code in PR #18306:
URL: https://github.com/apache/hudi/pull/18306#discussion_r2933930511
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java:
##########
@@ -160,55 +161,84 @@ private Option<Pair<Integer, String>>
getLatestDeltaCommitInfoSinceLastCompactio
return Option.empty();
}
+ private Option<Pair<Integer, String>> getDeltaCommitInfoSinceLogCompaction()
{
+ HoodieActiveTimeline rawActiveTimeline =
table.getMetaClient().getTableFormat()
+ .getTimelineFactory().createActiveTimeline(table.getMetaClient(),
false);
+ Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+ CompactionUtils.getDeltaCommitsSinceLatestLogCompaction(
+ table.getActiveTimeline().getDeltaCommitTimeline(),
rawActiveTimeline);
+ if (deltaCommitsInfo.isPresent()) {
+ return Option.of(Pair.of(
+ deltaCommitsInfo.get().getLeft().countInstants(),
+ deltaCommitsInfo.get().getRight().requestedTime()));
+ }
+ return Option.empty();
+ }
+
private boolean needCompact(CompactionTriggerStrategy
compactionTriggerStrategy) {
boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs
- Option<Pair<Integer, String>> latestDeltaCommitInfoOption =
getLatestDeltaCommitInfo();
- if (!latestDeltaCommitInfoOption.isPresent()) {
+ Option<Pair<Integer, String>> latestDeltaCommitInfoSinceCompactOption =
getLatestDeltaCommitInfoSinceCompaction();
+ if (!latestDeltaCommitInfoSinceCompactOption.isPresent()) {
return false;
}
- Pair<Integer, String> latestDeltaCommitInfo =
latestDeltaCommitInfoOption.get();
+ Pair<Integer, String> latestDeltaCommitInfoSinceCompact =
latestDeltaCommitInfoSinceCompactOption.get();
if (WriteOperationType.LOG_COMPACT.equals(operationType)) {
- return true;
+ // Log compaction schedule is triggered based on
getLogCompactionBlocksThreshold value.
+ // One deltacommit can create either one or more than one block
depending on the size of the write batch.
+ // In the worst case it would require approximately equal no. of
deltacommits to reach the LogCompactionBlocksThreshold value.
+ // Each logcompaction create one or more blocks, and transient failures
and retries can cause the number of blocks to
+ // exceed the LogCompactionBlocksThreshold value before the next time
log compaction scheduling is attempted.
+ // As a result, LogCompactionBlocksThreshold is treated as a threshold,
where if the approximate number of deltacommits
+ // since the last compaction and log compaction meets this threshold,
then log compaction should be scheduled.
+ Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption
= getDeltaCommitInfoSinceLogCompaction();
+ int numDeltaCommitsSinceLatestCompaction =
latestDeltaCommitInfoSinceCompact.getLeft();
+ int numDeltaCommitsSinceLatestLogCompaction =
latestDeltaCommitInfoSinceLogCompactOption.isPresent()
+ ? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
+ : 0;
+
+ int numDeltaCommitsSinceLatestCompactionOrLogCompaction =
Math.min(numDeltaCommitsSinceLatestCompaction,
numDeltaCommitsSinceLatestLogCompaction);
+ log.info("There have been {} delta commits since last compaction or log
compaction.", numDeltaCommitsSinceLatestCompactionOrLogCompaction);
Review Comment:
lets not add a info logging everytime.
atleast if we are looking to return `true` from here, its ok to do info
logging.
if not, it might be noisy
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java:
##########
@@ -160,55 +161,84 @@ private Option<Pair<Integer, String>>
getLatestDeltaCommitInfoSinceLastCompactio
return Option.empty();
}
+ private Option<Pair<Integer, String>> getDeltaCommitInfoSinceLogCompaction()
{
Review Comment:
shouldn't this be `getLatestDeltaCommitInfoSinceLogCompaction`(to align the
naming w/ existing method)
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +368,59 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLa
}
}
+ /**
+ * Returns a pair of (timeline containing the delta commits after the latest
completed
+ * log compaction delta commit, the completed log compaction commit
instant), if the latest completed
+ * log compaction commit is present; a pair of (timeline containing all the
delta commits,
+ * the first delta commit instant), if there is no completed log compaction
commit.
+ *
+ * @param deltaCommitTimeline Active timeline of table that contains only
delta commits.
+ * @param rawActiveTimeline Active timeline of table, that has current and
previous states of each instant.
+ * @return Pair of timeline containing delta commits and an instant.
+ */
+ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLatestLogCompaction(
+ final HoodieTimeline deltaCommitTimeline,
+ final HoodieActiveTimeline rawActiveTimeline) {
+ Option<HoodieInstant> lastLogCompactionInstantOption =
Option.fromJavaOptional(
+ rawActiveTimeline
+ .filterPendingLogCompactionTimeline()
+ .getReverseOrderedInstants()
+ .findFirst()
+ );
+
+ if (lastLogCompactionInstantOption.isPresent()) {
+ // Search for a corresponding completed delta commit for the latest log
compact instant observed.
+ // If a delta commit is found, then that means the last compact instant
was completed.
+ // Otherwise return empty, since that means there is a pending log
compaction that has not
+ // been executed yet. The latter scenario should not happen in practice,
as log compaction
+ // scheduling is only done after previous log compact pending instants
have been executed or
+ // rolled back.
+ String lastLogCompactionTimestamp =
lastLogCompactionInstantOption.get().requestedTime();
+ Option<HoodieInstant> lastCompletedLogCompactionInstantOption =
Option.fromJavaOptional(
+ deltaCommitTimeline
+ .filterCompletedInstants()
+ .filter(hoodieInstant ->
hoodieInstant.requestedTime().equals(lastLogCompactionTimestamp))
+ .getInstantsAsStream()
+ .findFirst()
+ );
+ if (lastCompletedLogCompactionInstantOption.isPresent()) {
+ HoodieInstant lastCompletedLogCompactionInstant =
lastCompletedLogCompactionInstantOption.get();
+ return Option.of(Pair.of(deltaCommitTimeline.findInstantsAfter(
+ lastCompletedLogCompactionInstant.requestedTime(),
Integer.MAX_VALUE), lastCompletedLogCompactionInstant));
+ } else {
+ LOG.info("Last log compaction instant {}, is in pending state so
returning empty value.", lastLogCompactionTimestamp);
Review Comment:
lets be judicious on info logging.
can you confirm we log this occasionally.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java:
##########
@@ -160,55 +161,84 @@ private Option<Pair<Integer, String>>
getLatestDeltaCommitInfoSinceLastCompactio
return Option.empty();
}
+ private Option<Pair<Integer, String>> getDeltaCommitInfoSinceLogCompaction()
{
+ HoodieActiveTimeline rawActiveTimeline =
table.getMetaClient().getTableFormat()
+ .getTimelineFactory().createActiveTimeline(table.getMetaClient(),
false);
+ Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+ CompactionUtils.getDeltaCommitsSinceLatestLogCompaction(
+ table.getActiveTimeline().getDeltaCommitTimeline(),
rawActiveTimeline);
+ if (deltaCommitsInfo.isPresent()) {
+ return Option.of(Pair.of(
+ deltaCommitsInfo.get().getLeft().countInstants(),
+ deltaCommitsInfo.get().getRight().requestedTime()));
+ }
+ return Option.empty();
+ }
+
private boolean needCompact(CompactionTriggerStrategy
compactionTriggerStrategy) {
boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs
- Option<Pair<Integer, String>> latestDeltaCommitInfoOption =
getLatestDeltaCommitInfo();
- if (!latestDeltaCommitInfoOption.isPresent()) {
+ Option<Pair<Integer, String>> latestDeltaCommitInfoSinceCompactOption =
getLatestDeltaCommitInfoSinceCompaction();
+ if (!latestDeltaCommitInfoSinceCompactOption.isPresent()) {
return false;
}
- Pair<Integer, String> latestDeltaCommitInfo =
latestDeltaCommitInfoOption.get();
+ Pair<Integer, String> latestDeltaCommitInfoSinceCompact =
latestDeltaCommitInfoSinceCompactOption.get();
if (WriteOperationType.LOG_COMPACT.equals(operationType)) {
- return true;
+ // Log compaction schedule is triggered based on
getLogCompactionBlocksThreshold value.
+ // One deltacommit can create either one or more than one block
depending on the size of the write batch.
Review Comment:
lets move this to a private method and keep this leaner.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +368,59 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLa
}
}
+ /**
+ * Returns a pair of (timeline containing the delta commits after the latest
completed
+ * log compaction delta commit, the completed log compaction commit
instant), if the latest completed
+ * log compaction commit is present; a pair of (timeline containing all the
delta commits,
+ * the first delta commit instant), if there is no completed log compaction
commit.
+ *
+ * @param deltaCommitTimeline Active timeline of table that contains only
delta commits.
+ * @param rawActiveTimeline Active timeline of table, that has current and
previous states of each instant.
+ * @return Pair of timeline containing delta commits and an instant.
+ */
+ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLatestLogCompaction(
+ final HoodieTimeline deltaCommitTimeline,
+ final HoodieActiveTimeline rawActiveTimeline) {
+ Option<HoodieInstant> lastLogCompactionInstantOption =
Option.fromJavaOptional(
+ rawActiveTimeline
+ .filterPendingLogCompactionTimeline()
+ .getReverseOrderedInstants()
+ .findFirst()
+ );
+
+ if (lastLogCompactionInstantOption.isPresent()) {
+ // Search for a corresponding completed delta commit for the latest log
compact instant observed.
+ // If a delta commit is found, then that means the last compact instant
was completed.
+ // Otherwise return empty, since that means there is a pending log
compaction that has not
+ // been executed yet. The latter scenario should not happen in practice,
as log compaction
+ // scheduling is only done after previous log compact pending instants
have been executed or
+ // rolled back.
+ String lastLogCompactionTimestamp =
lastLogCompactionInstantOption.get().requestedTime();
+ Option<HoodieInstant> lastCompletedLogCompactionInstantOption =
Option.fromJavaOptional(
+ deltaCommitTimeline
+ .filterCompletedInstants()
+ .filter(hoodieInstant ->
hoodieInstant.requestedTime().equals(lastLogCompactionTimestamp))
+ .getInstantsAsStream()
+ .findFirst()
+ );
+ if (lastCompletedLogCompactionInstantOption.isPresent()) {
+ HoodieInstant lastCompletedLogCompactionInstant =
lastCompletedLogCompactionInstantOption.get();
+ return Option.of(Pair.of(deltaCommitTimeline.findInstantsAfter(
Review Comment:
should we use `findInstantsModifiedAfterByCompletionTime`
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +368,59 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLa
}
}
+ /**
+ * Returns a pair of (timeline containing the delta commits after the latest
completed
+ * log compaction delta commit, the completed log compaction commit
instant), if the latest completed
+ * log compaction commit is present; a pair of (timeline containing all the
delta commits,
+ * the first delta commit instant), if there is no completed log compaction
commit.
+ *
+ * @param deltaCommitTimeline Active timeline of table that contains only
delta commits.
+ * @param rawActiveTimeline Active timeline of table, that has current and
previous states of each instant.
+ * @return Pair of timeline containing delta commits and an instant.
+ */
+ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLatestLogCompaction(
Review Comment:
`getDeltaCommitsSinceLatestCompletedLogCompaction`
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +368,59 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLa
}
}
+ /**
+ * Returns a pair of (timeline containing the delta commits after the latest
completed
+ * log compaction delta commit, the completed log compaction commit
instant), if the latest completed
+ * log compaction commit is present; a pair of (timeline containing all the
delta commits,
+ * the first delta commit instant), if there is no completed log compaction
commit.
+ *
+ * @param deltaCommitTimeline Active timeline of table that contains only
delta commits.
+ * @param rawActiveTimeline Active timeline of table, that has current and
previous states of each instant.
+ * @return Pair of timeline containing delta commits and an instant.
+ */
+ public static Option<Pair<HoodieTimeline, HoodieInstant>>
getDeltaCommitsSinceLatestLogCompaction(
+ final HoodieTimeline deltaCommitTimeline,
+ final HoodieActiveTimeline rawActiveTimeline) {
+ Option<HoodieInstant> lastLogCompactionInstantOption =
Option.fromJavaOptional(
+ rawActiveTimeline
+ .filterPendingLogCompactionTimeline()
Review Comment:
how about we introduce `filterLogCompactionTimeline()`
and then process the latest instant from it.
we can avoid the polling the timeline twice right?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java:
##########
@@ -160,55 +161,84 @@ private Option<Pair<Integer, String>>
getLatestDeltaCommitInfoSinceLastCompactio
return Option.empty();
}
+ private Option<Pair<Integer, String>> getDeltaCommitInfoSinceLogCompaction()
{
+ HoodieActiveTimeline rawActiveTimeline =
table.getMetaClient().getTableFormat()
+ .getTimelineFactory().createActiveTimeline(table.getMetaClient(),
false);
+ Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+ CompactionUtils.getDeltaCommitsSinceLatestLogCompaction(
+ table.getActiveTimeline().getDeltaCommitTimeline(),
rawActiveTimeline);
+ if (deltaCommitsInfo.isPresent()) {
+ return Option.of(Pair.of(
+ deltaCommitsInfo.get().getLeft().countInstants(),
+ deltaCommitsInfo.get().getRight().requestedTime()));
+ }
+ return Option.empty();
+ }
+
private boolean needCompact(CompactionTriggerStrategy
compactionTriggerStrategy) {
boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs
- Option<Pair<Integer, String>> latestDeltaCommitInfoOption =
getLatestDeltaCommitInfo();
- if (!latestDeltaCommitInfoOption.isPresent()) {
+ Option<Pair<Integer, String>> latestDeltaCommitInfoSinceCompactOption =
getLatestDeltaCommitInfoSinceCompaction();
+ if (!latestDeltaCommitInfoSinceCompactOption.isPresent()) {
return false;
}
- Pair<Integer, String> latestDeltaCommitInfo =
latestDeltaCommitInfoOption.get();
+ Pair<Integer, String> latestDeltaCommitInfoSinceCompact =
latestDeltaCommitInfoSinceCompactOption.get();
if (WriteOperationType.LOG_COMPACT.equals(operationType)) {
- return true;
+ // Log compaction schedule is triggered based on
getLogCompactionBlocksThreshold value.
+ // One deltacommit can create either one or more than one block
depending on the size of the write batch.
+ // In the worst case it would require approximately equal no. of
deltacommits to reach the LogCompactionBlocksThreshold value.
+ // Each logcompaction create one or more blocks, and transient failures
and retries can cause the number of blocks to
+ // exceed the LogCompactionBlocksThreshold value before the next time
log compaction scheduling is attempted.
+ // As a result, LogCompactionBlocksThreshold is treated as a threshold,
where if the approximate number of deltacommits
+ // since the last compaction and log compaction meets this threshold,
then log compaction should be scheduled.
+ Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption
= getDeltaCommitInfoSinceLogCompaction();
+ int numDeltaCommitsSinceLatestCompaction =
latestDeltaCommitInfoSinceCompact.getLeft();
+ int numDeltaCommitsSinceLatestLogCompaction =
latestDeltaCommitInfoSinceLogCompactOption.isPresent()
+ ? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
+ : 0;
+
+ int numDeltaCommitsSinceLatestCompactionOrLogCompaction =
Math.min(numDeltaCommitsSinceLatestCompaction,
numDeltaCommitsSinceLatestLogCompaction);
Review Comment:
too big of a name.
can we just use `numDeltaCommitSince`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]