That makes sense, thank you, Hequn. I can see the tradeoff between using allowedLateness on a window to trigger multiple firings, versus a window with a watermark lagging some amount of time (e.g. 3 hours) that has only a single firing.
Thanks again, -- Scott Kidder On Fri, Oct 19, 2018 at 7:51 PM Hequn Cheng <chenghe...@gmail.com> wrote: > Hi Scott, > > Yes, the window trigger firing for every single late element. > > If you only want the window to be triggered once, you can: > - Remove the allowedLateness() > - Use BoundedOutOfOrdernessTimestampExtractor to emit Watermarks that > lag behind the element. > > The code(scala) looks like: > >> class TimestampExtractor[T1, T2] >> extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)]( >> Time.hours(3)) { >> override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { >> element._3.getTime >> } >> } > > > Pay attention to that this will increase the latency since only trigger > firing for the last element. > > Best, Hequn > > On Sat, Oct 20, 2018 at 1:15 AM Scott Kidder <kidder.sc...@gmail.com> > wrote: > >> I'm using event-time windows of 1 hour that have an allowed lateness of >> several hours. This supports the processing of access logs that can be >> delayed by several hours. The windows aggregate data over the 1 hour period >> and write to a database sink. Pretty straightforward. >> >> Will the event-time trigger lead to the window trigger firing for every >> single late element? Suppose thousands of late elements arrive >> simultaneously; I'd like to avoid having that lead to thousands of database >> updates in a short period of time. Ideally, I could batch up the late >> window changes and have it trigger when the window is finally closed or >> some processing-time duration passes (e.g. once per minute). >> >> For reference, here's what the aggregate window definition looks like >> with Flink 1.5.3: >> >> chunkSource.keyBy(record -> record.getRecord().getEnvironmentId()) >> .timeWindow(Time.hours(1)) >> .allowedLateness(Time.hours(3)) >> .aggregate(new EnvironmentAggregateWatchTimeFunction()) >> .uid("env-watchtime-stats") >> .name("Env Watch-Time Stats") >> .addSink(new EnvironmentWatchTimeDBSink()); >> >> >> Thank you, >> >> -- >> Scott Kidder >> >