This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f5e661  [HUDI-2769] Fix StreamerUtil#medianInstantTime for very near 
instant time (#4005)
6f5e661 is described below

commit 6f5e661010a85f3426131111baff962a6d6ba91e
Author: Danny Chan <[email protected]>
AuthorDate: Tue Nov 16 13:46:34 2021 +0800

    [HUDI-2769] Fix StreamerUtil#medianInstantTime for very near instant time 
(#4005)
---
 hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java   | 8 +++++++-
 .../src/test/java/org/apache/hudi/utils/TestStreamerUtil.java     | 2 ++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 867621a..ddbd24e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -414,7 +415,12 @@ public class StreamerUtil {
       ValidationUtils.checkArgument(high > low,
           "Instant [" + highVal + "] should have newer timestamp than instant 
[" + lowVal + "]");
       long median = low + (high - low) / 2;
-      return low >= median ? Option.empty() : 
Option.of(HoodieActiveTimeline.formatInstantTime(new Date(median)));
+      final String instantTime = HoodieActiveTimeline.formatInstantTime(new 
Date(median));
+      if (HoodieTimeline.compareTimestamps(lowVal, 
HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime)
+          || HoodieTimeline.compareTimestamps(highVal, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime)) {
+        return Option.empty();
+      }
+      return Option.of(instantTime);
     } catch (ParseException e) {
       throw new HoodieException("Get median instant time with interval [" + 
lowVal + ", " + highVal + "] error", e);
     }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
index c05e5b0..b9e2b91 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
@@ -86,6 +86,8 @@ public class TestStreamerUtil {
     assertThrows(IllegalArgumentException.class,
         () -> StreamerUtil.medianInstantTime(lower, higher),
         "The first argument should have newer instant time");
+    // test very near instant time
+    assertFalse(StreamerUtil.medianInstantTime("20211116115634", 
"20211116115633").isPresent());
   }
 
   @Test

Reply via email to