Hi Aljoscha, Thanks for the info!
Andy On Fri, 15 Jan 2016 at 10:12 Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > I imagine you are taking about CountTrigger, DeltaTrigger, and > Continuous*Trigger. For these we never purge. They are a leftover artifact > from an earlier approach to implementing windowing strategies that was > inspired by IBM InfoSphere streams. Here, all triggers are essentially > accumulating and elements are evicted by an evictor. This is very flexible > but makes it hard to implement windowing code efficiently. If you are > interested here is a Master Thesis that describes that earlier > implementation: > http://www.diva-portal.se/smash/get/diva2:861798/FULLTEXT01.pdf > > These triggers are problematic because they never purge window contents if > you don’t have an evictor that does correct eviction. Also, they don’t > allow incremental aggregation over elements as they arrive since you don’t > know what will be the contents of the window until the trigger fires and > the evictor evicts. > > So, as a short answer: the accumulating triggers never purge window state > on their own. I hope this helps somehow. > > Cheers, > Aljoscha > > On 15 Jan 2016, at 09:55, Andrew Coates <big.andy.coa...@gmail.com> > wrote: > > > > Thanks Aljoscha, that's very enlightening. > > > > Can you please also explain what the default behaviour is? I.e. if I use > one if the accumulating inbuilt triggers, when does the state get purged? > (With your info I can now probably work things out, but you may give more > insight :) > > > > Also, are there plans to add explicit lateness control to flink core? > (I'm aware off the dataflow integration work ) > > > > Thanks again, > > > > Andy > > > > > > On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi, > > the window contents are stored in state managed by the window operator > at all times until they are purged by a Trigger returning PURGE from one of > its on*() methods. > > > > Out of the box, Flink does not have something akin to the lateness and > cleanup of Google Dataflow. You can, however implement it yourself using a > custom Trigger. This is an example that mimics Google Dataflow: > > > > public class EventTimeTrigger implements Trigger<Object, TimeWindow> { > > private static final long serialVersionUID = 1L; > > > > private final boolean accumulating; > > private final long allowedLateness; > > > > private EventTimeTrigger(boolean accumulating, long allowedLateness) { > > this.accumulating = accumulating; > > this.allowedLateness = allowedLateness; > > } > > > > @Override > > public TriggerResult onElement(Object element, long timestamp, > TimeWindow window, TriggerContext ctx) throws Exception { > > ctx.registerEventTimeTimer(window.maxTimestamp()); > > return TriggerResult.CONTINUE; > > } > > > > @Override > > public TriggerResult onEventTime(long time, TimeWindow window, > TriggerContext ctx) { > > if (time == window.maxTimestamp()) { > > if (accumulating) { > > // register the cleanup timer if we are accumulating (and > allow lateness) > > if (allowedLateness > 0) { > > ctx.registerEventTimeTimer(window.maxTimestamp() + > allowedLateness); > > } > > return TriggerResult.FIRE; > > } else { > > return TriggerResult.FIRE_AND_PURGE; > > } > > } else if (time == window.maxTimestamp() + allowedLateness) { > > return TriggerResult.PURGE; > > } > > > > return TriggerResult.CONTINUE; > > } > > > > @Override > > public TriggerResult onProcessingTime(long time, TimeWindow window, > TriggerContext ctx) throws Exception { > > return TriggerResult.CONTINUE; > > } > > > > @Override > > public String toString() { > > return "EventTimeTrigger()"; > > } > > > > /** > > * Creates an event-time trigger that fires once the watermark passes > the end of the window. > > * > > * <p> > > * Once the trigger fires all elements are discarded. Elements that > arrive late immediately > > * trigger window evaluation with just this one element. > > */ > > public static EventTimeTrigger discarding() { > > return new EventTimeTrigger(false, 0L); > > } > > > > /** > > * Creates an event-time trigger that fires once the watermark passes > the end of the window. > > * > > * <p> > > * This trigger will not immediately discard all elements once it > fires. Only after the > > * watermark passes the specified lateness are the window elements > discarded, without > > * emitting a new result. If a late element arrives within the > specified lateness > > * the window is computed again and a new result is emitted. > > */ > > public static EventTimeTrigger accumulating(AbstractTime > allowedLateness) { > > return new EventTimeTrigger(true, > allowedLateness.toMilliseconds()); > > } > > } > > > > You can specify a lateness and while that time is not yet reached the > windows will remain and late arriving elements will trigger window emission > with the complete window contents. > > > > Cheers, > > Aljoscha > > > On 13 Jan 2016, at 15:12, Andrew Coates <big.andy.coa...@gmail.com> > wrote: > > > > > > Hi, > > > > > > I'm trying to understand how the lifecycle of messages / state is > managed by Flink, but I'm failing to find any documentation. > > > > > > Specially, if I'm using a windowed stream and a type of trigger that > retain the elements of the window to allow for processing of late data e.g. > ContinousEventTimeTrigger, then where are the contents of the windows, or > their intermediate computation results, stored, and when is the data > removed? > > > > > > I'm thinking in terms of Google's Dataflow API, setting a windows the > withAllowedLateness option allows the caller to control how long past the > end of a window the data should be maintained. Does Flink have anything > similar? > > > > > > Thanks, > > > > > > Andy > > > >