This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6f5e661 [HUDI-2769] Fix StreamerUtil#medianInstantTime for very near
instant time (#4005)
6f5e661 is described below
commit 6f5e661010a85f3426131111baff962a6d6ba91e
Author: Danny Chan <[email protected]>
AuthorDate: Tue Nov 16 13:46:34 2021 +0800
[HUDI-2769] Fix StreamerUtil#medianInstantTime for very near instant time
(#4005)
---
hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java | 8 +++++++-
.../src/test/java/org/apache/hudi/utils/TestStreamerUtil.java | 2 ++
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 867621a..ddbd24e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -414,7 +415,12 @@ public class StreamerUtil {
ValidationUtils.checkArgument(high > low,
"Instant [" + highVal + "] should have newer timestamp than instant
[" + lowVal + "]");
long median = low + (high - low) / 2;
- return low >= median ? Option.empty() :
Option.of(HoodieActiveTimeline.formatInstantTime(new Date(median)));
+ final String instantTime = HoodieActiveTimeline.formatInstantTime(new
Date(median));
+ if (HoodieTimeline.compareTimestamps(lowVal,
HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime)
+ || HoodieTimeline.compareTimestamps(highVal,
HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime)) {
+ return Option.empty();
+ }
+ return Option.of(instantTime);
} catch (ParseException e) {
throw new HoodieException("Get median instant time with interval [" +
lowVal + ", " + highVal + "] error", e);
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
index c05e5b0..b9e2b91 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
@@ -86,6 +86,8 @@ public class TestStreamerUtil {
assertThrows(IllegalArgumentException.class,
() -> StreamerUtil.medianInstantTime(lower, higher),
"The first argument should have newer instant time");
+ // test very near instant time
+ assertFalse(StreamerUtil.medianInstantTime("20211116115634",
"20211116115633").isPresent());
}
@Test