gaborgsomogyi commented on code in PR #18118:
URL: https://github.com/apache/flink/pull/18118#discussion_r965893345


##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IntervalJoinITCase.java:
##########
@@ -155,6 +159,128 @@ public void testJoinsCorrectlyWithMultipleKeys() throws 
Exception {
                 "(key2,5):(key2,5)");
     }
 
+    private DataStream<Tuple2<String, Integer>> buildSourceStream(
+            final StreamExecutionEnvironment env, final SourceConsumer 
sourceConsumer) {
+        return env.addSource(
+                new SourceFunction<Tuple2<String, Integer>>() {
+                    @Override
+                    public void run(SourceContext<Tuple2<String, Integer>> 
ctx) {
+                        sourceConsumer.accept(ctx);
+                    }
+
+                    @Override
+                    public void cancel() {
+                        // do nothing
+                    }
+                });
+    }
+
+    // Ensure consumer func is serializable
+    private interface SourceConsumer
+            extends Serializable, 
Consumer<SourceFunction.SourceContext<Tuple2<String, Integer>>> {
+        long serialVersionUID = 1L;
+    }
+
+    @Test
+    public void testIntervalJoinSideOutputLeftLateData() throws Exception {
+
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        DataStream<Tuple2<String, Integer>> streamOne =
+                buildSourceStream(
+                        env,
+                        (ctx) -> {
+                            ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
+                            ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
+                            ctx.emitWatermark(new Watermark(3));
+                            ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L); 
// late data
+                        });
+
+        DataStream<Tuple2<String, Integer>> streamTwo =
+                buildSourceStream(
+                        env,
+                        (ctx) -> {
+                            ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
+                            ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L);
+                            ctx.emitWatermark(new Watermark(2));
+                            ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
+                        });
+
+        OutputTag<Tuple2<String, Integer>> late = new OutputTag<Tuple2<String, 
Integer>>("late") {};
+
+        SingleOutputStreamOperator<String> process =
+                streamOne
+                        .keyBy(new Tuple2KeyExtractor())
+                        .intervalJoin(streamTwo.keyBy(new 
Tuple2KeyExtractor()))
+                        .between(Time.milliseconds(-1), Time.milliseconds(1))
+                        .sideOutputLeftLateData(late)
+                        .process(new CombineToStringJoinFunction());
+
+        process.getSideOutput(late)

Review Comment:
   Just to be complete a last beautification can be added, namely we can create 
a function something like and call it from the 2 places:
   ```
   private void addSinkToSideOutput(...) {
   ...
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to