Hey there,
Hope everyone is well!
I have a question:
```
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Integer> dataStream = env.addSource(new CustomSource());
OutputTag<Integer> outputTag = new OutputTag<Integer>("late") {};
WatermarkStrategy<Integer> waStrat = WatermarkStrategy
.<Integer>forMonotonousTimestamps()
.withTimestampAssigner((i, timestamp) -> Long.valueOf(i));
SingleOutputStreamOperator<Integer> windowOperator =
dataStream
.assignTimestampsAndWatermarks(waStrat)
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(1)))
.sideOutputLateData(outputTag)
.apply(
new AllWindowFunction<Integer, Integer,
TimeWindow>() {
@Override
public void apply(TimeWindow
window, Iterable<Integer> values, Collector<Integer> out) {
System.out.println(window.getStart() + " -> " + window.getEnd());
for (Integer val : values) {
System.out.println(val + "
in window ");
}
}
});
windowOperator
.getSideOutput(outputTag)
.flatMap(
new FlatMapFunction<Integer, String>() {
@Override
public void flatMap(Integer value,
Collector<String> out) {
System.out.println("LATE: " + value);
}
});
env.execute();
```
And my custom source:
```
static class CustomSource implements SourceFunction<Integer> {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
int i = 0;
while (i++ < 10) {
Thread.sleep(100);
if (i < 5) {
ctx.collect(i * 10);
} else {
ctx.collect(i);
}
}
}
@Override
public void cancel() {
}
}
```
Now in the source if I leave `Thread.sleep(100)` in there I get the
following output:
```
10 -> 11
10 in window
20 -> 21
20 in window
LATE: 5
30 -> 31
30 in window
LATE: 6
LATE: 7
LATE: 8
LATE: 9
LATE: 10
40 -> 41
40 in window
```
If I comment out the sleep, I get:
```
5 -> 6
5 in window
6 -> 7
6 in window
7 -> 8
7 in window
8 -> 9
8 in window
9 -> 10
9 in window
10 -> 11
10 in window
10 in window
20 -> 21
20 in window
30 -> 31
30 in window
40 -> 41
40 in window
```
I thought by using `forMonotonousTimestamps` it would automatically drop
the late events from the window and emit them to the side output since
the first 4 elements will have a higher timestamp than the last 5 but it
seems
it evaluates all elements emitted by the source at once and then emit
the watermark after one batch of events by the source has been processed.
I assume there would be a better approach then to sleep in between
collecting in the source for getting the expected result?
( The background here is that I want to write tests for some event time
processing and assert that the SideOutput will be used for late events
and these are being
processed correctly. In production the Kafka source will not always emit
data continuously, hence I want to test this but I have not found
anything better than sleeping, maybe that is the correct approach?)
Best regards,
Dario