Hey there,

When one uses .window(TumblingEventTimeWindows.of(SOME_TIME)) it will never window any values if the user uses

WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
                        .withTimestampAssigner((t, timestamp) -> t.f0)
)

I was wondering whether Flink should throw an Exception at the start of the programming and prevent the use of it as
no values would ever reach the process function.

If so I would create a ticket and love to work on it.

Here is an example:

public class PlaygroundJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource<Tuple2<Long, Integer>> source = env.addSource(new SourceFunction<Tuple2<Long, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<Long, Integer>> sourceContext) throws Exception {
                int i = 0;
                while (true) {
                    Tuple2<Long, Integer> tuple = Tuple2.of(System.currentTimeMillis(), i++ % 10);
                    sourceContext.collect(tuple);
                }
            }

            @Override
            public void cancel() {
            }

        });

        source.assignTimestampsAndWatermarks(
                // Switch noWatermarks() to forMonotonousTimestamps()
                // and values are being printed.
                WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
                        .withTimestampAssigner((t, timestamp) -> t.f0)
                ).keyBy(t -> t.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
                .process(new ProcessWindowFunction<Tuple2<Long, Integer>, String, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer key, Context context, Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws Exception {
                        int count = 0;
                        Iterator<Tuple2<Long, Integer>> iter = iterable.iterator();
                        while (iter.hasNext()) {
                            count++;
                            iter.next();
                        }

                        out.collect("Key: " + key + " count: " + count);

                    }
                }).print();

        env.execute();
    }
}

Best regards,

Dario

Reply via email to