Hi,
I read the following in Flink doc "We can explicitly specify a Trigger to
overwrite the default Trigger provided by the WindowAssigner. Note that
specifying a triggers does not add an additional trigger condition but
replaces the current trigger."
So, I tested out the below code with count trigger. As per my understanding
this will override the default watermark based trigger.
val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428,
4),
("2016-04-07 13:11:59", 157428, 4),
("2016-04-07 13:11:59", 111283, 23),
("2016-04-07 13:11:57", 108042, 23),
("2016-04-07 13:12:00", 161374, 9),
("2016-04-07 13:12:00", 161374, 9),
("2016-04-07 13:11:59", 136505, 4)
)
)
.assignAscendingTimestamps(b => f.parse(b._1).getTime())
.map(b => (b._3, b._2))
testStream.print
val countStream = testStream
.keyBy(_._1)
.timeWindow(Time.seconds(20))
.trigger(CountTrigger.of(3))
.fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }
countStream.print
Output I saw confirms the documented behavior. Processing is triggered only
when we have 3 elements for a key.
How do I force trigger the left over records when watermark is past the
window? I.e, I want to use triggers to start early processing but finalize
the window based on watermark.
Output shows that records for keys 23 & 9 weren't processed.
(4,157428)
(4,157428)
(23,111283)
(23,108042)
(9,161374)
(9,161374)
(4,136505)
(4,List(157428, 157428, 136505))
Thanks,
Srikanth