Hey there,

Hope everyone is well!

I have a question:
```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Integer> dataStream = env.addSource(new CustomSource());

        OutputTag<Integer> outputTag = new OutputTag<Integer>("late") {};

        WatermarkStrategy<Integer> waStrat = WatermarkStrategy
                .<Integer>forMonotonousTimestamps()
                .withTimestampAssigner((i, timestamp) -> Long.valueOf(i));

        SingleOutputStreamOperator<Integer> windowOperator =
                dataStream
                        .assignTimestampsAndWatermarks(waStrat)
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(1)))
                        .sideOutputLateData(outputTag)
                        .apply(
                                new AllWindowFunction<Integer, Integer, TimeWindow>() {
                                    @Override
                                    public void apply(TimeWindow window, Iterable<Integer> values, Collector<Integer> out) {
System.out.println(window.getStart() + " -> " + window.getEnd());
                                        for (Integer val : values) {
                                            System.out.println(val + " in window ");
                                        }
                                    }
                                });

        windowOperator
                .getSideOutput(outputTag)
                .flatMap(
                        new FlatMapFunction<Integer, String>() {
                            @Override
                            public void flatMap(Integer value, Collector<String> out) {
                                System.out.println("LATE: " + value);
                            }
                        });

        env.execute();
```

And my custom source:
```
static class CustomSource implements SourceFunction<Integer> {

        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            int i = 0;
            while (i++ < 10) {
                Thread.sleep(100);
                if (i < 5) {
                    ctx.collect(i * 10);
                } else {
                    ctx.collect(i);
                }

            }
        }

        @Override
        public void cancel() {

        }
    }
```

Now in the source if I leave `Thread.sleep(100)` in there I get the following output:
```
10 -> 11
10 in window
20 -> 21
20 in window
LATE: 5
30 -> 31
30 in window
LATE: 6
LATE: 7
LATE: 8
LATE: 9
LATE: 10
40 -> 41
40 in window
```

If I comment out the sleep, I get:
```
5 -> 6
5 in window
6 -> 7
6 in window
7 -> 8
7 in window
8 -> 9
8 in window
9 -> 10
9 in window
10 -> 11
10 in window
10 in window
20 -> 21
20 in window
30 -> 31
30 in window
40 -> 41
40 in window
```

I thought by using `forMonotonousTimestamps` it would automatically drop the late events from the window and emit them to the side output since the first 4 elements will have a higher timestamp than the last 5 but it seems it evaluates all elements emitted by the source at once and then emit the watermark after one batch of events by the source has been processed.

I assume there would be a better approach then to sleep in between collecting in the source for getting the expected result?

( The background here is that I want to write tests for some event time processing and assert that the SideOutput will be used for late events and these are being processed correctly. In production the Kafka source will not always emit data continuously, hence I want to test this but I have not found anything better than sleeping, maybe that is the correct approach?)

Best regards,

Dario

Reply via email to