Thanks Aljoscha, that's very enlightening. Can you please also explain what the default behaviour is? I.e. if I use one if the accumulating inbuilt triggers, when does the state get purged? (With your info I can now probably work things out, but you may give more insight :)
Also, are there plans to add explicit lateness control to flink core? (I'm aware off the dataflow integration work ) Thanks again, Andy On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > the window contents are stored in state managed by the window operator at > all times until they are purged by a Trigger returning PURGE from one of > its on*() methods. > > Out of the box, Flink does not have something akin to the lateness and > cleanup of Google Dataflow. You can, however implement it yourself using a > custom Trigger. This is an example that mimics Google Dataflow: > > public class EventTimeTrigger implements Trigger<Object, TimeWindow> { > private static final long serialVersionUID = 1L; > > private final boolean accumulating; > private final long allowedLateness; > > private EventTimeTrigger(boolean accumulating, long allowedLateness) { > this.accumulating = accumulating; > this.allowedLateness = allowedLateness; > } > > @Override > public TriggerResult onElement(Object element, long timestamp, > TimeWindow window, TriggerContext ctx) throws Exception { > ctx.registerEventTimeTimer(window.maxTimestamp()); > return TriggerResult.CONTINUE; > } > > @Override > public TriggerResult onEventTime(long time, TimeWindow window, > TriggerContext ctx) { > if (time == window.maxTimestamp()) { > if (accumulating) { > // register the cleanup timer if we are accumulating (and > allow lateness) > if (allowedLateness > 0) { > ctx.registerEventTimeTimer(window.maxTimestamp() + > allowedLateness); > } > return TriggerResult.FIRE; > } else { > return TriggerResult.FIRE_AND_PURGE; > } > } else if (time == window.maxTimestamp() + allowedLateness) { > return TriggerResult.PURGE; > } > > return TriggerResult.CONTINUE; > } > > @Override > public TriggerResult onProcessingTime(long time, TimeWindow window, > TriggerContext ctx) throws Exception { > return TriggerResult.CONTINUE; > } > > @Override > public String toString() { > return "EventTimeTrigger()"; > } > > /** > * Creates an event-time trigger that fires once the watermark passes > the end of the window. > * > * <p> > * Once the trigger fires all elements are discarded. Elements that > arrive late immediately > * trigger window evaluation with just this one element. > */ > public static EventTimeTrigger discarding() { > return new EventTimeTrigger(false, 0L); > } > > /** > * Creates an event-time trigger that fires once the watermark passes > the end of the window. > * > * <p> > * This trigger will not immediately discard all elements once it > fires. Only after the > * watermark passes the specified lateness are the window elements > discarded, without > * emitting a new result. If a late element arrives within the > specified lateness > * the window is computed again and a new result is emitted. > */ > public static EventTimeTrigger accumulating(AbstractTime > allowedLateness) { > return new EventTimeTrigger(true, allowedLateness.toMilliseconds()); > } > } > > You can specify a lateness and while that time is not yet reached the > windows will remain and late arriving elements will trigger window emission > with the complete window contents. > > Cheers, > Aljoscha > > On 13 Jan 2016, at 15:12, Andrew Coates <big.andy.coa...@gmail.com> > wrote: > > > > Hi, > > > > I'm trying to understand how the lifecycle of messages / state is > managed by Flink, but I'm failing to find any documentation. > > > > Specially, if I'm using a windowed stream and a type of trigger that > retain the elements of the window to allow for processing of late data e.g. > ContinousEventTimeTrigger, then where are the contents of the windows, or > their intermediate computation results, stored, and when is the data > removed? > > > > I'm thinking in terms of Google's Dataflow API, setting a windows the > withAllowedLateness option allows the caller to control how long past the > end of a window the data should be maintained. Does Flink have anything > similar? > > > > Thanks, > > > > Andy > >