This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bb8fc3e9f63 [HUDI-6929] Lazy loading dynamically for
CompletionTimeQueryView (#9898)
bb8fc3e9f63 is described below
commit bb8fc3e9f632a1fc3647fda63d482849355df2b7
Author: Danny Chan <[email protected]>
AuthorDate: Mon Oct 23 11:39:25 2023 +0800
[HUDI-6929] Lazy loading dynamically for CompletionTimeQueryView (#9898)
---
.../timeline/TestCompletionTimeQueryView.java | 1 +
.../table/timeline/CompletionTimeQueryView.java | 67 ++++++++++------------
.../table/timeline/HoodieArchivedTimeline.java | 20 +++++--
.../hudi/common/table/timeline/HoodieTimeline.java | 8 +++
4 files changed, 54 insertions(+), 42 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
index ae34f4d606f..9df49d4b9d0 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
@@ -82,6 +82,7 @@ public class TestCompletionTimeQueryView {
for (int i = 1; i < 3; i++) {
assertThat(view.getCompletionTime(String.format("%08d",
i)).orElse(""), is(String.format("%08d", i + 1000)));
}
+ assertThat("The cursor instant should be slided",
view.getCursorInstant(), is(String.format("%08d", 1)));
// query with inflight start time
assertFalse(view.getCompletionTime(String.format("%08d",
11)).isPresent());
// query with non-exist start time
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
index 290f31ff344..1e2881809f3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
@@ -30,8 +30,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.COMPLETION_TIME_ARCHIVED_META_FIELD;
-import static org.apache.hudi.common.table.timeline.HoodieTimeline.EQUALS;
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
@@ -53,9 +51,12 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
private final Map<String, String> startToCompletionInstantTimeMap;
/**
- * The start instant time to eagerly load from, by default load last N days
of completed instants.
+ * The cursor instant time to eagerly load from, by default load last N days
of completed instants.
+ * It is tuned dynamically with lazy loading occurs, assumes an initial
cursor instant as t10,
+ * a completion query for t5 would trigger a lazy loading with this cursor
instant been updated as t5.
+ * The sliding of the cursor instant economizes redundant loading from
different queries.
*/
- private final String startInstant;
+ private volatile String cursorInstant;
/**
* The first write instant on the active timeline, used for query
optimization.
@@ -75,12 +76,12 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
* The constructor.
*
* @param metaClient The table meta client.
- * @param startInstant The earliest instant time to eagerly load from, by
default load last N days of completed instants.
+ * @param cursorInstant The earliest instant time to eagerly load from, by
default load last N days of completed instants.
*/
- public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String
startInstant) {
+ public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String
cursorInstant) {
this.metaClient = metaClient;
this.startToCompletionInstantTimeMap = new ConcurrentHashMap<>();
- this.startInstant = minInstant(startInstant,
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
+ this.cursorInstant = minInstant(cursorInstant,
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
// Note: use getWriteTimeline() to keep sync with the fs view
visibleCommitsAndCompactionTimeline, see
AbstractTableFileSystemView.refreshTimeline.
this.firstNonSavepointCommit =
metaClient.getActiveTimeline().getWriteTimeline().getFirstNonSavepointCommit().map(HoodieInstant::getTimestamp).orElse("");
load();
@@ -132,10 +133,10 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
// ==============================================================
// LEGACY CODE
// ==============================================================
- // Fixes the completion time to reflect the completion sequence
correctly
- // if the file slice base instant time is not in datetime format. For
example,
- // 1. many test cases just use integer string as the instant time.
- // 2. MDT uses compaction instant time as [delta_instant] + "001".
+ // Fixes the completion time to reflect the completion sequence
correctly.
+ // The file slice base instant time is not in datetime format in the
following scenarios:
+ // 1. many test cases just use integer string as the instant time.
+ // 2. MDT uses compaction instant time with pattern [delta_instant]
+ "001".
// CAUTION: this fix only works for OCC(Optimistic Concurrency
Control).
// for NB-CC(Non-blocking Concurrency Control), the file slicing may
be incorrect.
@@ -157,17 +158,23 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
if (completionTime != null) {
return Option.of(completionTime);
}
- if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN,
this.startInstant)) {
+ if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN_OR_EQUALS,
this.cursorInstant)) {
// the instant is still pending
return Option.empty();
}
// the 'startTime' should be out of the eager loading range, switch to a
lazy loading.
// This operation is resource costly.
- HoodieArchivedTimeline.loadInstants(metaClient,
- new EqualsTimestampFilter(startTime),
- HoodieArchivedTimeline.LoadMode.SLIM,
- r -> true,
- this::readCompletionTime);
+ synchronized (this) {
+ if (HoodieTimeline.compareTimestamps(startTime, LESSER_THAN,
this.cursorInstant)) {
+ HoodieArchivedTimeline.loadInstants(metaClient,
+ new HoodieArchivedTimeline.ClosedOpenTimeRangeFilter(startTime,
this.cursorInstant),
+ HoodieArchivedTimeline.LoadMode.SLIM,
+ r -> true,
+ this::readCompletionTime);
+ }
+ // refresh the start instant
+ this.cursorInstant = startTime;
+ }
return
Option.ofNullable(this.startToCompletionInstantTimeMap.get(startTime));
}
@@ -183,7 +190,7 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
.forEach(instant -> setCompletionTime(instant.getTimestamp(),
instant.getCompletionTime()));
// then load the archived instants.
HoodieArchivedTimeline.loadInstants(metaClient,
- new HoodieArchivedTimeline.StartTsFilter(this.startInstant),
+ new HoodieArchivedTimeline.StartTsFilter(this.cursorInstant),
HoodieArchivedTimeline.LoadMode.SLIM,
r -> true,
this::readCompletionTime);
@@ -206,28 +213,12 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
return compareTimestamps(instant1, LESSER_THAN, instant2) ? instant1 :
instant2;
}
+ public String getCursorInstant() {
+ return cursorInstant;
+ }
+
@Override
public void close() throws Exception {
this.startToCompletionInstantTimeMap.clear();
}
-
- // -------------------------------------------------------------------------
- // Inner class
- // -------------------------------------------------------------------------
-
- /**
- * A time based filter with equality of specified timestamp.
- */
- public static class EqualsTimestampFilter extends
HoodieArchivedTimeline.TimeRangeFilter {
- private final String ts;
-
- public EqualsTimestampFilter(String ts) {
- super(ts, ts); // endTs is never used
- this.ts = ts;
- }
-
- public boolean isInRange(String instantTime) {
- return HoodieTimeline.compareTimestamps(instantTime, EQUALS, ts);
- }
- }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index bdd5750684e..cdffd4c0b3c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -288,8 +288,8 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
* A time based filter with range (startTs, endTs].
*/
public static class TimeRangeFilter {
- private final String startTs;
- private final String endTs;
+ protected final String startTs;
+ protected final String endTs;
public TimeRangeFilter(String startTs, String endTs) {
this.startTs = startTs;
@@ -301,15 +301,27 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
}
}
+ /**
+ * A time based filter with range [startTs, endTs).
+ */
+ public static class ClosedOpenTimeRangeFilter extends TimeRangeFilter {
+
+ public ClosedOpenTimeRangeFilter(String startTs, String endTs) {
+ super(startTs, endTs);
+ }
+
+ public boolean isInRange(String instantTime) {
+ return HoodieTimeline.isInClosedOpenRange(instantTime, this.startTs,
this.endTs);
+ }
+ }
+
/**
* A time based filter with range [startTs, +∞).
*/
public static class StartTsFilter extends TimeRangeFilter {
- private final String startTs;
public StartTsFilter(String startTs) {
super(startTs, null); // endTs is never used
- this.startTs = startTs;
}
public boolean isInRange(String instantTime) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 8644204ab82..82ec439bd25 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -423,6 +423,14 @@ public interface HoodieTimeline extends Serializable {
&& HoodieTimeline.compareTimestamps(timestamp,
LESSER_THAN_OR_EQUALS, endTs);
}
+ /**
+ * Return true if specified timestamp is in range [startTs, endTs).
+ */
+ static boolean isInClosedOpenRange(String timestamp, String startTs, String
endTs) {
+ return HoodieTimeline.compareTimestamps(timestamp, GREATER_THAN_OR_EQUALS,
startTs)
+ && HoodieTimeline.compareTimestamps(timestamp, LESSER_THAN, endTs);
+ }
+
/**
* Return true if specified timestamp is in range [startTs, endTs].
*/