asfgit closed pull request #6449: [FLINK-9994][DataStream API] IntervalJoinOp Context#getTimestamp() returns max timestamp. URL: https://github.com/apache/flink/pull/6449
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java index 0c449e64f41..43085cb42c4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java @@ -152,6 +152,7 @@ public IntervalJoinOperator( @Override public void open() throws Exception { super.open(); + collector = new TimestampedCollector<>(output); context = new ContextImpl(userFunction); internalTimerService = @@ -204,15 +205,15 @@ public void processElement2(StreamRecord<T2> record) throws Exception { } @SuppressWarnings("unchecked") - private <OUR, OTHER> void processElement( - StreamRecord<OUR> record, - MapState<Long, List<BufferEntry<OUR>>> ourBuffer, - MapState<Long, List<BufferEntry<OTHER>>> otherBuffer, - long relativeLowerBound, - long relativeUpperBound, - boolean isLeft) throws Exception { - - final OUR ourValue = record.getValue(); + private <THIS, OTHER> void processElement( + final StreamRecord<THIS> record, + final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer, + final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer, + final long relativeLowerBound, + final long relativeUpperBound, + final boolean isLeft) throws Exception { + + final THIS ourValue = record.getValue(); final long ourTimestamp = record.getTimestamp(); if (ourTimestamp == Long.MIN_VALUE) { @@ -257,14 +258,18 @@ private boolean isLate(long timestamp) { } private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception { - long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); + final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); + collector.setAbsoluteTimestamp(resultTimestamp); - context.leftTimestamp = leftTimestamp; - context.rightTimestamp = rightTimestamp; + context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp); + userFunction.processElement(left, right, context, collector); } - private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception { + private static <T> void addToBuffer( + final MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer, + final T value, + final long timestamp) throws Exception { List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp); if (elemsInBucket == null) { elemsInBucket = new ArrayList<>(); @@ -313,6 +318,8 @@ public void onProcessingTime(InternalTimer<K, String> timer) throws Exception { */ private final class ContextImpl extends ProcessJoinFunction<T1, T2, OUT>.Context { + private long resultTimestamp = Long.MIN_VALUE; + private long leftTimestamp = Long.MIN_VALUE; private long rightTimestamp = Long.MIN_VALUE; @@ -321,6 +328,12 @@ private ContextImpl(ProcessJoinFunction<T1, T2, OUT> func) { func.super(); } + private void updateTimestamps(long left, long right, long result) { + this.leftTimestamp = left; + this.rightTimestamp = right; + this.resultTimestamp = result; + } + @Override public long getLeftTimestamp() { return leftTimestamp; @@ -333,7 +346,7 @@ public long getRightTimestamp() { @Override public long getTimestamp() { - return leftTimestamp; + return resultTimestamp; } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java index ee3f4d8fc12..53f514b98da 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java @@ -481,13 +481,16 @@ public void testReturnsCorrectTimestamp() throws Exception { TestElem.serializer(), TestElem.serializer(), new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() { + + private static final long serialVersionUID = 1L; + @Override public void processElement( TestElem left, TestElem right, Context ctx, Collector<Tuple2<TestElem, TestElem>> out) throws Exception { - Assert.assertEquals(left.ts, ctx.getTimestamp()); + Assert.assertEquals(Math.max(left.ts, right.ts), ctx.getTimestamp()); } } ); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services