I imagine you are taking about CountTrigger, DeltaTrigger, and 
Continuous*Trigger. For these we never purge. They are a leftover artifact from 
an earlier approach to implementing windowing strategies that was inspired by 
IBM InfoSphere streams. Here, all triggers are essentially accumulating and 
elements are evicted by an evictor. This is very flexible but makes it hard to 
implement windowing code efficiently. If you are interested here is a Master 
Thesis that describes that earlier implementation: 

These triggers are problematic because they never purge window contents if you 
don’t have an evictor that does correct eviction. Also, they don’t allow 
incremental aggregation over elements as they arrive since you don’t know what 
will be the contents of the window until the trigger fires and the evictor 

So, as a short answer: the accumulating triggers never purge window state on 
their own. I hope this helps somehow.

> On 15 Jan 2016, at 09:55, Andrew Coates <big.andy.coa...@gmail.com> wrote:
> Thanks Aljoscha, that's very enlightening.
> Can you please also explain what the default behaviour is? I.e. if I use one 
> if the accumulating inbuilt triggers, when does the state get purged? (With 
> your info I can now probably work things out, but you may give more insight :)
> Also, are there plans to add explicit lateness control to flink core?  (I'm 
> aware off the dataflow integration work )
> Thanks again,
> Andy
> On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <aljos...@apache.org> wrote:
> Hi,
> the window contents are stored in state managed by the window operator at all 
> times until they are purged by a Trigger returning PURGE from one of its 
> on*() methods.
> Out of the box, Flink does not have something akin to the lateness and 
> cleanup of Google Dataflow. You can, however implement it yourself using a 
> custom Trigger. This is an example that mimics Google Dataflow:
> public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
>    private static final long serialVersionUID = 1L;
>    private final boolean accumulating;
>    private final long allowedLateness;
>    private EventTimeTrigger(boolean accumulating, long allowedLateness) {
>       this.accumulating = accumulating;
>       this.allowedLateness = allowedLateness;
>    }
>    @Override
>    public TriggerResult onElement(Object element, long timestamp, TimeWindow 
> window, TriggerContext ctx) throws Exception {
>       ctx.registerEventTimeTimer(window.maxTimestamp());
>       return TriggerResult.CONTINUE;
>    }
>    @Override
>    public TriggerResult onEventTime(long time, TimeWindow window, 
> TriggerContext ctx) {
>       if (time == window.maxTimestamp()) {
>          if (accumulating) {
>             // register the cleanup timer if we are accumulating (and allow 
> lateness)
>             if (allowedLateness > 0) {
>                ctx.registerEventTimeTimer(window.maxTimestamp() + 
> allowedLateness);
>             }
>             return TriggerResult.FIRE;
>          } else {
>             return TriggerResult.FIRE_AND_PURGE;
>          }
>       } else if (time == window.maxTimestamp() + allowedLateness) {
>          return TriggerResult.PURGE;
>       }
>       return TriggerResult.CONTINUE;
>    }
>    @Override
>    public TriggerResult onProcessingTime(long time, TimeWindow window, 
> TriggerContext ctx) throws Exception {
>       return TriggerResult.CONTINUE;
>    }
>    @Override
>    public String toString() {
>       return "EventTimeTrigger()";
>    }
>    /**
>     * Creates an event-time trigger that fires once the watermark passes the 
> end of the window.
>     *
>     * <p>
>     * Once the trigger fires all elements are discarded. Elements that arrive 
> late immediately
>     * trigger window evaluation with just this one element.
>     */
>    public static EventTimeTrigger discarding() {
>       return new EventTimeTrigger(false, 0L);
>    }
>    /**
>     * Creates an event-time trigger that fires once the watermark passes the 
> end of the window.
>     *
>     * <p>
>     * This trigger will not immediately discard all elements once it fires. 
> Only after the
>     * watermark passes the specified lateness are the window elements 
> discarded, without
>     * emitting a new result. If a late element arrives within the specified 
> lateness
>     * the window is computed again and a new result is emitted.
>     */
>    public static EventTimeTrigger accumulating(AbstractTime allowedLateness) {
>       return new EventTimeTrigger(true, allowedLateness.toMilliseconds());
>    }
> }
> You can specify a lateness and while that time is not yet reached the windows 
> will remain and late arriving elements will trigger window emission with the 
> complete window contents.
> Cheers,
> Aljoscha
> > On 13 Jan 2016, at 15:12, Andrew Coates <big.andy.coa...@gmail.com> wrote:
> >
> > Hi,
> >
> > I'm trying to understand how the lifecycle of messages / state is managed 
> > by Flink, but I'm failing to find any documentation.
> >
> > Specially, if I'm using a windowed stream and a type of trigger that retain 
> > the elements of the window to allow for processing of late data e.g. 
> > ContinousEventTimeTrigger, then where are the contents of the windows, or 
> > their intermediate computation results, stored, and when is the data 
> > removed?
> >
> > I'm thinking in terms of Google's Dataflow API, setting a windows the 
> > withAllowedLateness option allows the caller to control how long past the 
> > end of a window the data should be maintained.  Does Flink have anything 
> > similar?
> >
> > Thanks,
> >
> > Andy

Reply via email to