Hi Aljoscha,

Thanks for the info!

Andy

On Fri, 15 Jan 2016 at 10:12 Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> I imagine you are taking about CountTrigger, DeltaTrigger, and
> Continuous*Trigger. For these we never purge. They are a leftover artifact
> from an earlier approach to implementing windowing strategies that was
> inspired by IBM InfoSphere streams. Here, all triggers are essentially
> accumulating and elements are evicted by an evictor. This is very flexible
> but makes it hard to implement windowing code efficiently. If you are
> interested here is a Master Thesis that describes that earlier
> implementation:
> http://www.diva-portal.se/smash/get/diva2:861798/FULLTEXT01.pdf
>
> These triggers are problematic because they never purge window contents if
> you don’t have an evictor that does correct eviction. Also, they don’t
> allow incremental aggregation over elements as they arrive since you don’t
> know what will be the contents of the window until the trigger fires and
> the evictor evicts.
>
> So, as a short answer: the accumulating triggers never purge window state
> on their own. I hope this helps somehow.
>
> Cheers,
> Aljoscha
> > On 15 Jan 2016, at 09:55, Andrew Coates <big.andy.coa...@gmail.com>
> wrote:
> >
> > Thanks Aljoscha, that's very enlightening.
> >
> > Can you please also explain what the default behaviour is? I.e. if I use
> one if the accumulating inbuilt triggers, when does the state get purged?
> (With your info I can now probably work things out, but you may give more
> insight :)
> >
> > Also, are there plans to add explicit lateness control to flink core?
> (I'm aware off the dataflow integration work )
> >
> > Thanks again,
> >
> > Andy
> >
> >
> > On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <aljos...@apache.org> wrote:
> > Hi,
> > the window contents are stored in state managed by the window operator
> at all times until they are purged by a Trigger returning PURGE from one of
> its on*() methods.
> >
> > Out of the box, Flink does not have something akin to the lateness and
> cleanup of Google Dataflow. You can, however implement it yourself using a
> custom Trigger. This is an example that mimics Google Dataflow:
> >
> > public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
> >    private static final long serialVersionUID = 1L;
> >
> >    private final boolean accumulating;
> >    private final long allowedLateness;
> >
> >    private EventTimeTrigger(boolean accumulating, long allowedLateness) {
> >       this.accumulating = accumulating;
> >       this.allowedLateness = allowedLateness;
> >    }
> >
> >    @Override
> >    public TriggerResult onElement(Object element, long timestamp,
> TimeWindow window, TriggerContext ctx) throws Exception {
> >       ctx.registerEventTimeTimer(window.maxTimestamp());
> >       return TriggerResult.CONTINUE;
> >    }
> >
> >    @Override
> >    public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) {
> >       if (time == window.maxTimestamp()) {
> >          if (accumulating) {
> >             // register the cleanup timer if we are accumulating (and
> allow lateness)
> >             if (allowedLateness > 0) {
> >                ctx.registerEventTimeTimer(window.maxTimestamp() +
> allowedLateness);
> >             }
> >             return TriggerResult.FIRE;
> >          } else {
> >             return TriggerResult.FIRE_AND_PURGE;
> >          }
> >       } else if (time == window.maxTimestamp() + allowedLateness) {
> >          return TriggerResult.PURGE;
> >       }
> >
> >       return TriggerResult.CONTINUE;
> >    }
> >
> >    @Override
> >    public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
> >       return TriggerResult.CONTINUE;
> >    }
> >
> >    @Override
> >    public String toString() {
> >       return "EventTimeTrigger()";
> >    }
> >
> >    /**
> >     * Creates an event-time trigger that fires once the watermark passes
> the end of the window.
> >     *
> >     * <p>
> >     * Once the trigger fires all elements are discarded. Elements that
> arrive late immediately
> >     * trigger window evaluation with just this one element.
> >     */
> >    public static EventTimeTrigger discarding() {
> >       return new EventTimeTrigger(false, 0L);
> >    }
> >
> >    /**
> >     * Creates an event-time trigger that fires once the watermark passes
> the end of the window.
> >     *
> >     * <p>
> >     * This trigger will not immediately discard all elements once it
> fires. Only after the
> >     * watermark passes the specified lateness are the window elements
> discarded, without
> >     * emitting a new result. If a late element arrives within the
> specified lateness
> >     * the window is computed again and a new result is emitted.
> >     */
> >    public static EventTimeTrigger accumulating(AbstractTime
> allowedLateness) {
> >       return new EventTimeTrigger(true,
> allowedLateness.toMilliseconds());
> >    }
> > }
> >
> > You can specify a lateness and while that time is not yet reached the
> windows will remain and late arriving elements will trigger window emission
> with the complete window contents.
> >
> > Cheers,
> > Aljoscha
> > > On 13 Jan 2016, at 15:12, Andrew Coates <big.andy.coa...@gmail.com>
> wrote:
> > >
> > > Hi,
> > >
> > > I'm trying to understand how the lifecycle of messages / state is
> managed by Flink, but I'm failing to find any documentation.
> > >
> > > Specially, if I'm using a windowed stream and a type of trigger that
> retain the elements of the window to allow for processing of late data e.g.
> ContinousEventTimeTrigger, then where are the contents of the windows, or
> their intermediate computation results, stored, and when is the data
> removed?
> > >
> > > I'm thinking in terms of Google's Dataflow API, setting a windows the
> withAllowedLateness option allows the caller to control how long past the
> end of a window the data should be maintained.  Does Flink have anything
> similar?
> > >
> > > Thanks,
> > >
> > > Andy
> >
>
>

Reply via email to