Maybe the last example of this blog post is helpful [1].

Best, Fabian

[1]
https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink

2016-05-10 17:24 GMT+02:00 Srikanth <srikanth...@gmail.com>:

> Hi,
>
> I read the following in Flink doc "We can explicitly specify a Trigger to
> overwrite the default Trigger provided by the WindowAssigner. Note that
> specifying a triggers does not add an additional trigger condition but
> replaces the current trigger."
> So, I tested out the below code with count trigger. As per my
> understanding this will override the default watermark based trigger.
>
> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428,
> 4),
>  ("2016-04-07 13:11:59", 157428, 4),
>  ("2016-04-07 13:11:59", 111283, 23),
>  ("2016-04-07 13:11:57", 108042, 23),
>  ("2016-04-07 13:12:00", 161374, 9),
>  ("2016-04-07 13:12:00", 161374, 9),
>  ("2016-04-07 13:11:59", 136505, 4)
> )
> )
>    .assignAscendingTimestamps(b => f.parse(b._1).getTime())
>            .map(b => (b._3, b._2))
>
> testStream.print
>
> val countStream = testStream
> .keyBy(_._1)
> .timeWindow(Time.seconds(20))
> .trigger(CountTrigger.of(3))
> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }
>
> countStream.print
>
> Output I saw confirms the documented behavior. Processing is triggered
> only when we have 3 elements for a key.
> How do I force trigger the left over records when watermark is past the
> window? I.e, I want to use triggers to start early processing but finalize
> the window based on watermark.
>
> Output shows that records for keys 23 & 9 weren't processed.
>   (4,157428)
>   (4,157428)
>   (23,111283)
>   (23,108042)
>   (9,161374)
>   (9,161374)
>   (4,136505)
>
>   (4,List(157428, 157428, 136505))
>
> Thanks,
> Srikanth
>

Reply via email to