The problem here is that the ContiuousEventTimeTrigger is kind of broken. It relies on the first element to trigger a future timer but the time might not progress this far. It should additionally trigger at the end of the window.
Here is a version with an improved continuous trigger: https://gist.github.com/mxm/a1d6b22c772971c98e2ce886dc9818b1?ts=2 By the way, if you remove the ContinuousEventTimeTrigger (which will implicitly set a regular EventTimeTrigger) for the CoGroup, it also works fine. I don't know whether you really want early firings there. Cheers, Max PS: Final word on the cleanup. The state should always be cleaned up at the end of the window + allowedLateness you have set. On Tue, Nov 22, 2016 at 11:08 PM, William Saar <will...@saar.se> wrote: > Thanks! > One difference is that my topology had 2 sources. I have updated your > example to also use 2 sources and that breaks the co-group operation in the > example as well! > > https://gist.github.com/saarw/8f9513435a41ab29b36da77c16a8b0ed > > Nice to know that purging can be added to the event trigger. > > William > > > ----- Original Message ----- > From: > user@flink.apache.org > > To: > "user@flink.apache.org" <user@flink.apache.org> > Cc: > > Sent: > Tue, 22 Nov 2016 11:50:52 +0100 > > Subject: > Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams? > > > Hi William, > > I've reproduced your example locally for some toy data and everything > was working as expected (with the early triggering). So I'm assuming > either there is something wrong with your input data or the behavior > doesn't always manifest. > > Here's the example I run in case you want to try: > https://gist.github.com/mxm/6cb1e6e9a572a26df76917176849f405 > > Gyula is right, the ContinuousEventTimeTrigger never purges the window > but that you can circumvent that by extending this trigger and purging > at the end of the window, similarly as done in the EventTimeTrigger. > > -Max > > > On Mon, Nov 21, 2016 at 6:52 PM, William Saar <will...@saar.se> wrote: >> Thanks! >> >> Yes, the SlidingEventTimeWindow works, but is there any way to >> pre-aggregate >> things with tumbling windows that emit events more often than their window >> size? Perhaps I can do this before I merge the streams? (But if >> ContinuousEventTimeTrigger is the only way to do that, it's bad if it >> doesn't clean up its state). >> >> I assume using sliding window states will be too large and less efficient >> than tumbling windows as a sliding fold needs to keep all events in the >> window and recompute the fold as events slide out of the window, while a >> tumbling fold just needs to keep the aggregation and can discard events as >> it folds them. >> >> I am reviewing how one would replace a batch solution based on 3 bucketed >> aggregations of different window sizes and it seems tumbling windows would >> be a perfect fit and would need to keep only the 3 aggregations a memory, >> while sliding windows would need to keep up to 3 copies of all events (for >> at least the smallest window size) to compute the same type of results. >> >> Hälsningar! >> William >> >> >> ----- Original Message ----- >> From: >> user@flink.apache.org >> >> To: >> <user@flink.apache.org> >> Cc: >> >> Sent: >> Mon, 21 Nov 2016 08:22:16 +0000 >> Subject: >> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams? >> >> >> >> Hi William, >> >> I am wondering whether the ContinuousEventTimeTrigger is the best choice >> here (it never cleans up the state as far as I know). >> >> Have you tried the simple SlidingEventTimeWindows as your window function? >> >> Cheers, >> Gyula >> >> William Saar <will...@saar.se> ezt írta (időpont: 2016. nov. 19., Szo, >> 18:28): >>> >>> Hi! >>> >>> My topology below seems to work when I comment out all the lines with >>> ContinuousEventTimeTrigger, but prints nothing when the line is in there. >>> Can I coGroup two large time windows that use a different trigger time >>> than >>> the window size? (even if the ContinuousEventTimeTrigger doesn't work for >>> coGroups, I would not expect the result to be completely silent). >>> >>> The streams I'm cogroupng are from 2 different Kafka sources and uses >>> event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps >>> >>> DataStream<CommonType> stream1 = >>> <stream of event type1> >>> .window(TumblingEventTimeWindows.of(Time.seconds(30))) >>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10))) >>> .fold(...); >>> >>> >>> DataStream<CommonType> stream2 = >>> <stream of event type2> >>> .window(TumblingEventTimeWindows.of(Time.seconds(30))) >>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10))) >>> .fold(...); >>> >>> >>> >>> stream1.coGroup(stream2).where(...).equalTo(...) >>> .window(TumblingEventTimeWindows.of(Time.seconds(30))) >>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10))) >>> .print() >>> >>> Thanks, >>> >>> William >>> >>