Fantastic. Sounds like things are moving in the right direction. I'm hoping
this will be tiered storage.

Thanks!

On Fri, 15 Jan 2016, 17:04 Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> don’t worry, it’s good to get questions about this stuff. :D
>
> You are right, if Flink is not clever about the state your JVMs can run
> out of memory and blow up. We are currently working on several things that
> should make this more robust:
> 1) Put Flink Windows on Flink’s partitioned state abstraction (for this it
> needs to be enhanced a bit)
> 2) Provide more State Backends
>
> Having 1) and 2) allows choosing different state backends for the window
> operations without changing the program. For example, there is a state
> backend that stores state in-memory, I’m working on a state backend that
> stores state in RocksDB (on-disk), Gyula Fóra is working on s state backend
> that stores state in HDFS TFiles (if I’m not mistaken) and he also
> previously contributed the DB state backend that can store state in a SQL
> data base.
>
> Cheers,
> Aljoscha
>
> On 15 Jan 2016, at 16:56, Andrew Coates <big.andy.coa...@gmail.com> wrote:
> >
> >
> > Hi Aljoscha,
> >
> > Just thinking on the EventTimeTrigger example you provided, and I'm
> going to apologise in advance for taking more of your time!,  but I'm
> thinking that should I go down that route any long allowedLateness, we'll
> run into issues with memory use, unless Flink is smart enough, configurable
> enough, or customisable enough to allow where the ageing state is kept.
> >
> > Thoughts?
> >
> > Thanks!
> >
> > Andy
> >
> > On Fri, 15 Jan 2016 at 15:51 Andrew Coates <big.andy.coa...@gmail.com>
> wrote:
> > Hi Aljoscha,
> >
> > Thanks for the info!
> >
> > Andy
> >
> > On Fri, 15 Jan 2016 at 10:12 Aljoscha Krettek <aljos...@apache.org>
> wrote:
> > Hi,
> > I imagine you are taking about CountTrigger, DeltaTrigger, and
> Continuous*Trigger. For these we never purge. They are a leftover artifact
> from an earlier approach to implementing windowing strategies that was
> inspired by IBM InfoSphere streams. Here, all triggers are essentially
> accumulating and elements are evicted by an evictor. This is very flexible
> but makes it hard to implement windowing code efficiently. If you are
> interested here is a Master Thesis that describes that earlier
> implementation:
> http://www.diva-portal.se/smash/get/diva2:861798/FULLTEXT01.pdf
> >
> > These triggers are problematic because they never purge window contents
> if you don’t have an evictor that does correct eviction. Also, they don’t
> allow incremental aggregation over elements as they arrive since you don’t
> know what will be the contents of the window until the trigger fires and
> the evictor evicts.
> >
> > So, as a short answer: the accumulating triggers never purge window
> state on their own. I hope this helps somehow.
> >
> > Cheers,
> > Aljoscha
> > > On 15 Jan 2016, at 09:55, Andrew Coates <big.andy.coa...@gmail.com>
> wrote:
> > >
> > > Thanks Aljoscha, that's very enlightening.
> > >
> > > Can you please also explain what the default behaviour is? I.e. if I
> use one if the accumulating inbuilt triggers, when does the state get
> purged? (With your info I can now probably work things out, but you may
> give more insight :)
> > >
> > > Also, are there plans to add explicit lateness control to flink core?
> (I'm aware off the dataflow integration work )
> > >
> > > Thanks again,
> > >
> > > Andy
> > >
> > >
> > > On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <aljos...@apache.org>
> wrote:
> > > Hi,
> > > the window contents are stored in state managed by the window operator
> at all times until they are purged by a Trigger returning PURGE from one of
> its on*() methods.
> > >
> > > Out of the box, Flink does not have something akin to the lateness and
> cleanup of Google Dataflow. You can, however implement it yourself using a
> custom Trigger. This is an example that mimics Google Dataflow:
> > >
> > > public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
> > >    private static final long serialVersionUID = 1L;
> > >
> > >    private final boolean accumulating;
> > >    private final long allowedLateness;
> > >
> > >    private EventTimeTrigger(boolean accumulating, long
> allowedLateness) {
> > >       this.accumulating = accumulating;
> > >       this.allowedLateness = allowedLateness;
> > >    }
> > >
> > >    @Override
> > >    public TriggerResult onElement(Object element, long timestamp,
> TimeWindow window, TriggerContext ctx) throws Exception {
> > >       ctx.registerEventTimeTimer(window.maxTimestamp());
> > >       return TriggerResult.CONTINUE;
> > >    }
> > >
> > >    @Override
> > >    public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) {
> > >       if (time == window.maxTimestamp()) {
> > >          if (accumulating) {
> > >             // register the cleanup timer if we are accumulating (and
> allow lateness)
> > >             if (allowedLateness > 0) {
> > >                ctx.registerEventTimeTimer(window.maxTimestamp() +
> allowedLateness);
> > >             }
> > >             return TriggerResult.FIRE;
> > >          } else {
> > >             return TriggerResult.FIRE_AND_PURGE;
> > >          }
> > >       } else if (time == window.maxTimestamp() + allowedLateness) {
> > >          return TriggerResult.PURGE;
> > >       }
> > >
> > >       return TriggerResult.CONTINUE;
> > >    }
> > >
> > >    @Override
> > >    public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
> > >       return TriggerResult.CONTINUE;
> > >    }
> > >
> > >    @Override
> > >    public String toString() {
> > >       return "EventTimeTrigger()";
> > >    }
> > >
> > >    /**
> > >     * Creates an event-time trigger that fires once the watermark
> passes the end of the window.
> > >     *
> > >     * <p>
> > >     * Once the trigger fires all elements are discarded. Elements that
> arrive late immediately
> > >     * trigger window evaluation with just this one element.
> > >     */
> > >    public static EventTimeTrigger discarding() {
> > >       return new EventTimeTrigger(false, 0L);
> > >    }
> > >
> > >    /**
> > >     * Creates an event-time trigger that fires once the watermark
> passes the end of the window.
> > >     *
> > >     * <p>
> > >     * This trigger will not immediately discard all elements once it
> fires. Only after the
> > >     * watermark passes the specified lateness are the window elements
> discarded, without
> > >     * emitting a new result. If a late element arrives within the
> specified lateness
> > >     * the window is computed again and a new result is emitted.
> > >     */
> > >    public static EventTimeTrigger accumulating(AbstractTime
> allowedLateness) {
> > >       return new EventTimeTrigger(true,
> allowedLateness.toMilliseconds());
> > >    }
> > > }
> > >
> > > You can specify a lateness and while that time is not yet reached the
> windows will remain and late arriving elements will trigger window emission
> with the complete window contents.
> > >
> > > Cheers,
> > > Aljoscha
> > > > On 13 Jan 2016, at 15:12, Andrew Coates <big.andy.coa...@gmail.com>
> wrote:
> > > >
> > > > Hi,
> > > >
> > > > I'm trying to understand how the lifecycle of messages / state is
> managed by Flink, but I'm failing to find any documentation.
> > > >
> > > > Specially, if I'm using a windowed stream and a type of trigger that
> retain the elements of the window to allow for processing of late data e.g.
> ContinousEventTimeTrigger, then where are the contents of the windows, or
> their intermediate computation results, stored, and when is the data
> removed?
> > > >
> > > > I'm thinking in terms of Google's Dataflow API, setting a windows
> the withAllowedLateness option allows the caller to control how long past
> the end of a window the data should be maintained.  Does Flink have
> anything similar?
> > > >
> > > > Thanks,
> > > >
> > > > Andy
> > >
> >
>
>

Reply via email to