Dario Heinisch created FLINK-24623: -------------------------------------- Summary: Prevent usage of EventTimeWindows when EventTime is disabled Key: FLINK-24623 URL: https://issues.apache.org/jira/browse/FLINK-24623 Project: Flink Issue Type: Improvement Components: API / DataStream Reporter: Dario Heinisch
Having the following stream will never process values after the windowing as event time based has been disabled via the Watermark strategy: {code:java} public class PlaygroundJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<Tuple2<Long, Integer>> source = env.addSource(new SourceFunction<Tuple2<Long, Integer>>() { @Override public void run(SourceContext<Tuple2<Long, Integer>> sourceContext) throws Exception { int i = 0; while (true) { Tuple2<Long, Integer> tuple = Tuple2.of(System.currentTimeMillis(), i++ % 10); sourceContext.collect(tuple); } } @Override public void cancel() { } }); source.assignTimestampsAndWatermarks( // Switch noWatermarks() to forMonotonousTimestamps() // and values are being printed. WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks() .withTimestampAssigner((t, timestamp) -> t.f0) ).keyBy(t -> t.f1) .window(TumblingEventTimeWindows.of(Time.seconds(1))) .process(new ProcessWindowFunction<Tuple2<Long, Integer>, String, Integer, TimeWindow>() { @Override public void process(Integer key, Context context, Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws Exception { int count = 0; Iterator<Tuple2<Long, Integer>> iter = iterable.iterator(); while (iter.hasNext()) { count++; iter.next(); } out.collect("Key: " + key + " count: " + count); } }).print(); env.execute(); } }{code} The issue is that the stream makes use of _noWatermarks()_ which effectively disables any event time windowing. As this pipeline can never process values it is faulty and Flink should throw an Exception when starting up. -------------------- Proposed change: We extend the interface [WatermarkStrategy|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L55] with the method _boolean isEventTime()_. We create a new class named _EventTimeWindowPreconditions_ and add the following method to it where we make use of _isEventTime()_: {code:java} public static void hasPrecedingEventTimeGenerator(final List<Transformation<?>> predecessors) { for (int i = predecessors.size() - 1; i >= 0; i--) { final Transformation<?> pre = predecessors.get(i); if (pre instanceof TimestampsAndWatermarksTransformation) { TimestampsAndWatermarksTransformation<?> timestampsAndWatermarksTransformation = (TimestampsAndWatermarksTransformation<?>) pre; final WatermarkStrategy<?> waStrat = timestampsAndWatermarksTransformation.getWatermarkStrategy(); // assert that it generates timestamps or throw exception if (!waStrat.isEventTime()) { // TODO: Custom exception throw new IllegalArgumentException( "Cannot use an EventTime window with a preceding water mark generator which" + " does not ingest event times. Did you use noWatermarks() as the WatermarkStrategy" + " and used EventTime windows such as SlidingEventTimeWindows/SlidingEventTimeWindows ?" + " These windows will never window any values as your stream does not support event time" ); } // We have to terminate the check now as we have found the first most recent // timestamp assigner for this window and ensured that it actually adds event // time stamps. If there has been previously in the chain a window assigner // such as noWatermarks() we can safely ignore it as another valid event time watermark assigner // exists in the chain after and before our current event time window. break; } } } {code} Then we can update the constructors of [AllWindowedStream|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L112] and [WindowedStream|https://github.com/apache/flink/blob/2cb477343de5dce70978c0add5ec58edbaec157c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L79] to: {code:java} if (windowAssigner.isEventTime()) { EventTimeWindowPreconditions.hasPrecedingEventTimeGenerator(input.getTransformation().getInputs()); } {code} This is the approach I currently have in mind but not sure whether this is the best approach. Best regards, Dario -- This message was sent by Atlassian Jira (v8.3.4#803005)