Note that the built-in `BoundedOutOfOrdernessTimestampExtractor` generates watermarks based only on the timestamp of incoming events. Without new events, `BoundedOutOfOrdernessTimestampExtractor` will not advance the event-time clock. That explains why the window doesn't trigger immediately after 10s; a new event must arrive to advance the watermark.
As Timo said, a different implementation of `AssignerWithPeriodicWatermark s` could behave differently. Have a look at the implementation of ` TimestampsAndPeriodicWatermarksOperator` within Flink to better understand how `AssignerWithPeriodicWatermarks` is invoked. On Wed, Aug 2, 2017 at 8:13 AM, Govindarajan Srinivasaraghavan < govindragh...@gmail.com> wrote: > Thanks Timo. Basically my requirement is based on event time the window > has to be created but the trigger has to happen either when it has received > the next element >10s or 10s has passed. Exactly the same way as you > described. Let me try the AssignerWithPeriodicWatermarks approach. > > Thanks, > Govind > > On Aug 2, 2017, at 7:46 AM, Timo Walther <twal...@apache.org> wrote: > > I forgot about the AssignerWithPeriodicWatermarks [1]. I think it could > solve your problem easily. > > Timo > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_ > timestamps_watermarks.html#with-periodic-watermarks > > Am 02.08.17 um 16:30 schrieb Timo Walther: > > The question is what defines your `10 seconds`. In event-time the incoming > events determine when 10 seconds have passed. Your description sounds like > you want to have results after 10 seconds wall-clock/processing-time. So > either you use a processing-time window or you implement a custom trigger > that triggers both on event-time or on a timer that you have set after 10 s > processing-time. > > Timo > > > Am 02.08.17 um 16:20 schrieb Govindarajan Srinivasaraghavan: > > Thanks Timo. The next message will arrive only after a minute or so. Is > there a way to evict whatever is there in window buffer just after 10 > seconds irrespective of whether a new message arrives or not. > > Thanks, > Govind > > On Aug 2, 2017, at 6:56 AM, Timo Walther <twal...@apache.org> wrote: > > Hi Govind, > > if the window is not triggered, this usually indicates that your timestamp > and watermark assignment is not correct. According to your description, I > don't think that you need a custom trigger/evictor. How often do events > arrive from one device? There must be another event from the same device > that has a timestamp >10s in order to trigger the window evaluation. > > Instead of using the Kafka timestamp, maybe you could also convert your > timestamps to UTC in the TimestampExtractor. > > There are no official limitation. However, each window comes with some > overhead. So you should choose your memory/state backends and parallelism > accordingly. > > Hope that helps. > > Timo > > > Am 02.08.17 um 06:54 schrieb Govindarajan Srinivasaraghavan: > > Hi, > > I have few questions regarding event time windowing. My scenario is > devices from various timezones will send messages with timestamp and I need > to create a window per device for 10 seconds. The messages will mostly > arrive in order. > > Here is my sample code to perform windowing and aggregating the messages > after the window to further process it. > > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("STREAM1", > new DeserializationSchema(), > kafkaConsumerProperties); > > DataStream<Message> msgStream = streamEnv > .addSource(consumer) > .assignTimestampsAndWatermarks(new TimestampExtractor(Time.of(100, > TimeUnit.MILLISECONDS))); // TimestampExtractor implements > BoundedOutOfOrdernessTimestampExtractor > > KeyedStream<Message, String> keyByStream = msgStream.keyBy(new > CustomKeySelector()); > > WindowedStream<Message, String, TimeWindow> windowedStream = > keyByStream.window(TumblingEventTimeWindows.of( > org.apache.flink.streaming.api.windowing.time.Time.seconds(10))); > > SingleOutputStreamOperator<Message> aggregatedStream = > windowedStream.apply(new AggregateEntries()); > > My questions are > > - In the above code, data gets passed till the window function but even > after window time the data is not received in the apply function. Do I have > to supply a custom evictor or trigger? > > - Since the data is being received from multiple timezones and each device > will have some time difference, would it be ok to assign the timestamp as > that of received timestamp in the message at source (kafka). Will there be > any issues with this? > > - Are there any limitations on the number of time windows that can be > created at any given time? In my scenario if there are 1 million devices > there will be 1 million tumbling windows. > > Thanks, > Govind > > > > >