Dario Heinisch created FLINK-24623:
--------------------------------------

             Summary: Prevent usage of EventTimeWindows when EventTime is 
disabled
                 Key: FLINK-24623
                 URL: https://issues.apache.org/jira/browse/FLINK-24623
             Project: Flink
          Issue Type: Improvement
          Components: API / DataStream
            Reporter: Dario Heinisch


Having the following stream will never process values after the windowing as 
event time based has been disabled via the Watermark strategy:


{code:java}
public class PlaygroundJob {
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration());         DataStreamSource<Tuple2<Long, Integer>> source =
env.addSource(new SourceFunction<Tuple2<Long, Integer>>() {
             @Override
             public void run(SourceContext<Tuple2<Long, Integer>>
sourceContext) throws Exception {
                 int i = 0;
                 while (true) {
                     Tuple2<Long, Integer> tuple =
Tuple2.of(System.currentTimeMillis(), i++ % 10);
                     sourceContext.collect(tuple);
                 }
             }             @Override
             public void cancel() {
             }         });         source.assignTimestampsAndWatermarks(
                 // Switch noWatermarks() to forMonotonousTimestamps()
                 // and values are being printed.
                 WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
                         .withTimestampAssigner((t, timestamp) -> t.f0)
                 ).keyBy(t -> t.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
                 .process(new ProcessWindowFunction<Tuple2<Long,
Integer>, String, Integer, TimeWindow>() {
                     @Override
                     public void process(Integer key, Context context,
Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws
Exception {
                         int count = 0;
                         Iterator<Tuple2<Long, Integer>> iter =
iterable.iterator();
                         while (iter.hasNext()) {
                             count++;
                             iter.next();
                         }                         out.collect("Key: " + key + 
" count: " + count);                     }
                 }).print();         env.execute();
     }
}{code}
 

The issue is that the stream makes use of _noWatermarks()_ which effectively 
disables any event time windowing. 

As this pipeline can never process values it is faulty and Flink should throw 
an Exception when starting up. 

 

--------------------

Proposed change:

We extend the interface 
[WatermarkStrategy|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L55]
 with the method _boolean isEventTime()_.

We create a new class named _EventTimeWindowPreconditions_ and add the 
following method to it where we make use of _isEventTime()_:

 
{code:java}
public static void hasPrecedingEventTimeGenerator(final List<Transformation<?>> 
predecessors) {
    for (int i = predecessors.size() - 1; i >= 0; i--) {
        final Transformation<?> pre = predecessors.get(i);
        if (pre instanceof TimestampsAndWatermarksTransformation) {

            TimestampsAndWatermarksTransformation<?> 
timestampsAndWatermarksTransformation =
                    (TimestampsAndWatermarksTransformation<?>) pre;

            final WatermarkStrategy<?> waStrat = 
timestampsAndWatermarksTransformation.getWatermarkStrategy();

            // assert that it generates timestamps or throw exception
            if (!waStrat.isEventTime()) {
                // TODO: Custom exception
                throw new IllegalArgumentException(
                        "Cannot use an EventTime window with a preceding water 
mark generator which"
                                + " does not ingest event times. Did you use 
noWatermarks() as the WatermarkStrategy"
                                + " and used EventTime windows such as 
SlidingEventTimeWindows/SlidingEventTimeWindows ?"
                                + " These windows will never window any values 
as your stream does not support event time"
                );
            }

            // We have to terminate the check now as we have found the first 
most recent
            // timestamp assigner for this window and ensured that it actually 
adds event
            // time stamps. If there has been previously in the chain a window 
assigner
            // such as noWatermarks() we can safely ignore it as another valid 
event time watermark assigner
            // exists in the chain after and before our current event time 
window.
            break;

        }
    }
}
{code}
 

Then we can update the constructors of 
[AllWindowedStream|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L112]
 and 
[WindowedStream|https://github.com/apache/flink/blob/2cb477343de5dce70978c0add5ec58edbaec157c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L79]
 to:
{code:java}
if (windowAssigner.isEventTime()) {
   
EventTimeWindowPreconditions.hasPrecedingEventTimeGenerator(input.getTransformation().getInputs());
}
{code}
This is the approach I currently have in mind but not sure whether this is the 
best approach. 

Best regards, 

Dario

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to