Hey there,
When one uses .window(TumblingEventTimeWindows.of(SOME_TIME)) it will
never window any values if the user uses
WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
.withTimestampAssigner((t, timestamp) -> t.f0)
)
I was wondering whether Flink should throw an Exception at the start of
the programming and prevent the use of it as
no values would ever reach the process function.
If so I would create a ticket and love to work on it.
Here is an example:
public class PlaygroundJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration());
DataStreamSource<Tuple2<Long, Integer>> source =
env.addSource(new SourceFunction<Tuple2<Long, Integer>>() {
@Override
public void run(SourceContext<Tuple2<Long, Integer>>
sourceContext) throws Exception {
int i = 0;
while (true) {
Tuple2<Long, Integer> tuple =
Tuple2.of(System.currentTimeMillis(), i++ % 10);
sourceContext.collect(tuple);
}
}
@Override
public void cancel() {
}
});
source.assignTimestampsAndWatermarks(
// Switch noWatermarks() to forMonotonousTimestamps()
// and values are being printed.
WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
.withTimestampAssigner((t, timestamp) -> t.f0)
).keyBy(t -> t.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.process(new ProcessWindowFunction<Tuple2<Long,
Integer>, String, Integer, TimeWindow>() {
@Override
public void process(Integer key, Context context,
Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws
Exception {
int count = 0;
Iterator<Tuple2<Long, Integer>> iter =
iterable.iterator();
while (iter.hasNext()) {
count++;
iter.next();
}
out.collect("Key: " + key + " count: " + count);
}
}).print();
env.execute();
}
}
Best regards,
Dario