Fantastic. Sounds like things are moving in the right direction. I'm hoping this will be tiered storage.
Thanks! On Fri, 15 Jan 2016, 17:04 Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > don’t worry, it’s good to get questions about this stuff. :D > > You are right, if Flink is not clever about the state your JVMs can run > out of memory and blow up. We are currently working on several things that > should make this more robust: > 1) Put Flink Windows on Flink’s partitioned state abstraction (for this it > needs to be enhanced a bit) > 2) Provide more State Backends > > Having 1) and 2) allows choosing different state backends for the window > operations without changing the program. For example, there is a state > backend that stores state in-memory, I’m working on a state backend that > stores state in RocksDB (on-disk), Gyula Fóra is working on s state backend > that stores state in HDFS TFiles (if I’m not mistaken) and he also > previously contributed the DB state backend that can store state in a SQL > data base. > > Cheers, > Aljoscha > > On 15 Jan 2016, at 16:56, Andrew Coates <big.andy.coa...@gmail.com> wrote: > > > > > > Hi Aljoscha, > > > > Just thinking on the EventTimeTrigger example you provided, and I'm > going to apologise in advance for taking more of your time!, but I'm > thinking that should I go down that route any long allowedLateness, we'll > run into issues with memory use, unless Flink is smart enough, configurable > enough, or customisable enough to allow where the ageing state is kept. > > > > Thoughts? > > > > Thanks! > > > > Andy > > > > On Fri, 15 Jan 2016 at 15:51 Andrew Coates <big.andy.coa...@gmail.com> > wrote: > > Hi Aljoscha, > > > > Thanks for the info! > > > > Andy > > > > On Fri, 15 Jan 2016 at 10:12 Aljoscha Krettek <aljos...@apache.org> > wrote: > > Hi, > > I imagine you are taking about CountTrigger, DeltaTrigger, and > Continuous*Trigger. For these we never purge. They are a leftover artifact > from an earlier approach to implementing windowing strategies that was > inspired by IBM InfoSphere streams. Here, all triggers are essentially > accumulating and elements are evicted by an evictor. This is very flexible > but makes it hard to implement windowing code efficiently. If you are > interested here is a Master Thesis that describes that earlier > implementation: > http://www.diva-portal.se/smash/get/diva2:861798/FULLTEXT01.pdf > > > > These triggers are problematic because they never purge window contents > if you don’t have an evictor that does correct eviction. Also, they don’t > allow incremental aggregation over elements as they arrive since you don’t > know what will be the contents of the window until the trigger fires and > the evictor evicts. > > > > So, as a short answer: the accumulating triggers never purge window > state on their own. I hope this helps somehow. > > > > Cheers, > > Aljoscha > > > On 15 Jan 2016, at 09:55, Andrew Coates <big.andy.coa...@gmail.com> > wrote: > > > > > > Thanks Aljoscha, that's very enlightening. > > > > > > Can you please also explain what the default behaviour is? I.e. if I > use one if the accumulating inbuilt triggers, when does the state get > purged? (With your info I can now probably work things out, but you may > give more insight :) > > > > > > Also, are there plans to add explicit lateness control to flink core? > (I'm aware off the dataflow integration work ) > > > > > > Thanks again, > > > > > > Andy > > > > > > > > > On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <aljos...@apache.org> > wrote: > > > Hi, > > > the window contents are stored in state managed by the window operator > at all times until they are purged by a Trigger returning PURGE from one of > its on*() methods. > > > > > > Out of the box, Flink does not have something akin to the lateness and > cleanup of Google Dataflow. You can, however implement it yourself using a > custom Trigger. This is an example that mimics Google Dataflow: > > > > > > public class EventTimeTrigger implements Trigger<Object, TimeWindow> { > > > private static final long serialVersionUID = 1L; > > > > > > private final boolean accumulating; > > > private final long allowedLateness; > > > > > > private EventTimeTrigger(boolean accumulating, long > allowedLateness) { > > > this.accumulating = accumulating; > > > this.allowedLateness = allowedLateness; > > > } > > > > > > @Override > > > public TriggerResult onElement(Object element, long timestamp, > TimeWindow window, TriggerContext ctx) throws Exception { > > > ctx.registerEventTimeTimer(window.maxTimestamp()); > > > return TriggerResult.CONTINUE; > > > } > > > > > > @Override > > > public TriggerResult onEventTime(long time, TimeWindow window, > TriggerContext ctx) { > > > if (time == window.maxTimestamp()) { > > > if (accumulating) { > > > // register the cleanup timer if we are accumulating (and > allow lateness) > > > if (allowedLateness > 0) { > > > ctx.registerEventTimeTimer(window.maxTimestamp() + > allowedLateness); > > > } > > > return TriggerResult.FIRE; > > > } else { > > > return TriggerResult.FIRE_AND_PURGE; > > > } > > > } else if (time == window.maxTimestamp() + allowedLateness) { > > > return TriggerResult.PURGE; > > > } > > > > > > return TriggerResult.CONTINUE; > > > } > > > > > > @Override > > > public TriggerResult onProcessingTime(long time, TimeWindow window, > TriggerContext ctx) throws Exception { > > > return TriggerResult.CONTINUE; > > > } > > > > > > @Override > > > public String toString() { > > > return "EventTimeTrigger()"; > > > } > > > > > > /** > > > * Creates an event-time trigger that fires once the watermark > passes the end of the window. > > > * > > > * <p> > > > * Once the trigger fires all elements are discarded. Elements that > arrive late immediately > > > * trigger window evaluation with just this one element. > > > */ > > > public static EventTimeTrigger discarding() { > > > return new EventTimeTrigger(false, 0L); > > > } > > > > > > /** > > > * Creates an event-time trigger that fires once the watermark > passes the end of the window. > > > * > > > * <p> > > > * This trigger will not immediately discard all elements once it > fires. Only after the > > > * watermark passes the specified lateness are the window elements > discarded, without > > > * emitting a new result. If a late element arrives within the > specified lateness > > > * the window is computed again and a new result is emitted. > > > */ > > > public static EventTimeTrigger accumulating(AbstractTime > allowedLateness) { > > > return new EventTimeTrigger(true, > allowedLateness.toMilliseconds()); > > > } > > > } > > > > > > You can specify a lateness and while that time is not yet reached the > windows will remain and late arriving elements will trigger window emission > with the complete window contents. > > > > > > Cheers, > > > Aljoscha > > > > On 13 Jan 2016, at 15:12, Andrew Coates <big.andy.coa...@gmail.com> > wrote: > > > > > > > > Hi, > > > > > > > > I'm trying to understand how the lifecycle of messages / state is > managed by Flink, but I'm failing to find any documentation. > > > > > > > > Specially, if I'm using a windowed stream and a type of trigger that > retain the elements of the window to allow for processing of late data e.g. > ContinousEventTimeTrigger, then where are the contents of the windows, or > their intermediate computation results, stored, and when is the data > removed? > > > > > > > > I'm thinking in terms of Google's Dataflow API, setting a windows > the withAllowedLateness option allows the caller to control how long past > the end of a window the data should be maintained. Does Flink have > anything similar? > > > > > > > > Thanks, > > > > > > > > Andy > > > > > > >