Maybe the last example of this blog post is helpful [1]. Best, Fabian
[1] https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink 2016-05-10 17:24 GMT+02:00 Srikanth <srikanth...@gmail.com>: > Hi, > > I read the following in Flink doc "We can explicitly specify a Trigger to > overwrite the default Trigger provided by the WindowAssigner. Note that > specifying a triggers does not add an additional trigger condition but > replaces the current trigger." > So, I tested out the below code with count trigger. As per my > understanding this will override the default watermark based trigger. > > val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428, > 4), > ("2016-04-07 13:11:59", 157428, 4), > ("2016-04-07 13:11:59", 111283, 23), > ("2016-04-07 13:11:57", 108042, 23), > ("2016-04-07 13:12:00", 161374, 9), > ("2016-04-07 13:12:00", 161374, 9), > ("2016-04-07 13:11:59", 136505, 4) > ) > ) > .assignAscendingTimestamps(b => f.parse(b._1).getTime()) > .map(b => (b._3, b._2)) > > testStream.print > > val countStream = testStream > .keyBy(_._1) > .timeWindow(Time.seconds(20)) > .trigger(CountTrigger.of(3)) > .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) } > > countStream.print > > Output I saw confirms the documented behavior. Processing is triggered > only when we have 3 elements for a key. > How do I force trigger the left over records when watermark is past the > window? I.e, I want to use triggers to start early processing but finalize > the window based on watermark. > > Output shows that records for keys 23 & 9 weren't processed. > (4,157428) > (4,157428) > (23,111283) > (23,108042) > (9,161374) > (9,161374) > (4,136505) > > (4,List(157428, 157428, 136505)) > > Thanks, > Srikanth >