The problem here is that the ContiuousEventTimeTrigger is kind of
broken. It relies on the first element to trigger a future timer but
the time might not progress this far. It should additionally trigger
at the end of the window.

Here is a version with an improved continuous trigger:
https://gist.github.com/mxm/a1d6b22c772971c98e2ce886dc9818b1?ts=2

By the way, if you remove the ContinuousEventTimeTrigger (which will
implicitly set a regular EventTimeTrigger) for the CoGroup, it also
works fine. I don't know whether you really want early firings there.

Cheers,
Max

PS: Final word on the cleanup. The state should always be cleaned up
at the end of the window + allowedLateness you have set.

On Tue, Nov 22, 2016 at 11:08 PM, William Saar <will...@saar.se> wrote:
> Thanks!
> One difference is that my topology had 2 sources. I have updated your
> example to also use 2 sources and that breaks the co-group operation in the
> example as well!
>
> https://gist.github.com/saarw/8f9513435a41ab29b36da77c16a8b0ed
>
> Nice to know that purging can be added to the event trigger.
>
> William
>
>
> ----- Original Message -----
> From:
> user@flink.apache.org
>
> To:
> "user@flink.apache.org" <user@flink.apache.org>
> Cc:
>
> Sent:
> Tue, 22 Nov 2016 11:50:52 +0100
>
> Subject:
> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
>
>
> Hi William,
>
> I've reproduced your example locally for some toy data and everything
> was working as expected (with the early triggering). So I'm assuming
> either there is something wrong with your input data or the behavior
> doesn't always manifest.
>
> Here's the example I run in case you want to try:
> https://gist.github.com/mxm/6cb1e6e9a572a26df76917176849f405
>
> Gyula is right, the ContinuousEventTimeTrigger never purges the window
> but that you can circumvent that by extending this trigger and purging
> at the end of the window, similarly as done in the EventTimeTrigger.
>
> -Max
>
>
> On Mon, Nov 21, 2016 at 6:52 PM, William Saar <will...@saar.se> wrote:
>> Thanks!
>>
>> Yes, the SlidingEventTimeWindow works, but is there any way to
>> pre-aggregate
>> things with tumbling windows that emit events more often than their window
>> size? Perhaps I can do this before I merge the streams? (But if
>> ContinuousEventTimeTrigger is the only way to do that, it's bad if it
>> doesn't clean up its state).
>>
>> I assume using sliding window states will be too large and less efficient
>> than tumbling windows as a sliding fold needs to keep all events in the
>> window and recompute the fold as events slide out of the window, while a
>> tumbling fold just needs to keep the aggregation and can discard events as
>> it folds them.
>>
>> I am reviewing how one would replace a batch solution based on 3 bucketed
>> aggregations of different window sizes and it seems tumbling windows would
>> be a perfect fit and would need to keep only the 3 aggregations a memory,
>> while sliding windows would need to keep up to 3 copies of all events (for
>> at least the smallest window size) to compute the same type of results.
>>
>> Hälsningar!
>> William
>>
>>
>> ----- Original Message -----
>> From:
>> user@flink.apache.org
>>
>> To:
>> <user@flink.apache.org>
>> Cc:
>>
>> Sent:
>> Mon, 21 Nov 2016 08:22:16 +0000
>> Subject:
>> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
>>
>>
>>
>> Hi William,
>>
>> I am wondering whether the ContinuousEventTimeTrigger is the best choice
>> here (it never cleans up the state as far as I know).
>>
>> Have you tried the simple SlidingEventTimeWindows as your window function?
>>
>> Cheers,
>> Gyula
>>
>> William Saar <will...@saar.se> ezt írta (időpont: 2016. nov. 19., Szo,
>> 18:28):
>>>
>>> Hi!
>>>
>>> My topology below seems to work when I comment out all the lines with
>>> ContinuousEventTimeTrigger, but prints nothing when the line is in there.
>>> Can I coGroup two large time windows that use a different trigger time
>>> than
>>> the window size? (even if the ContinuousEventTimeTrigger doesn't work for
>>> coGroups, I would not expect the result to be completely silent).
>>>
>>> The streams I'm cogroupng are from 2 different Kafka sources and uses
>>> event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps
>>>
>>> DataStream<CommonType> stream1 =
>>> <stream of event type1>
>>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>> .fold(...);
>>>
>>>
>>> DataStream<CommonType> stream2 =
>>> <stream of event type2>
>>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>> .fold(...);
>>>
>>>
>>>
>>> stream1.coGroup(stream2).where(...).equalTo(...)
>>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>> .print()
>>>
>>> Thanks,
>>>
>>> William
>>>
>>

Reply via email to