Thanks Aljoscha, that's very enlightening.

Can you please also explain what the default behaviour is? I.e. if I use
one if the accumulating inbuilt triggers, when does the state get purged?
(With your info I can now probably work things out, but you may give more
insight :)

Also, are there plans to add explicit lateness control to flink core?  (I'm
aware off the dataflow integration work )

Thanks again,

Andy

On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> the window contents are stored in state managed by the window operator at
> all times until they are purged by a Trigger returning PURGE from one of
> its on*() methods.
>
> Out of the box, Flink does not have something akin to the lateness and
> cleanup of Google Dataflow. You can, however implement it yourself using a
> custom Trigger. This is an example that mimics Google Dataflow:
>
> public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
>    private static final long serialVersionUID = 1L;
>
>    private final boolean accumulating;
>    private final long allowedLateness;
>
>    private EventTimeTrigger(boolean accumulating, long allowedLateness) {
>       this.accumulating = accumulating;
>       this.allowedLateness = allowedLateness;
>    }
>
>    @Override
>    public TriggerResult onElement(Object element, long timestamp,
> TimeWindow window, TriggerContext ctx) throws Exception {
>       ctx.registerEventTimeTimer(window.maxTimestamp());
>       return TriggerResult.CONTINUE;
>    }
>
>    @Override
>    public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) {
>       if (time == window.maxTimestamp()) {
>          if (accumulating) {
>             // register the cleanup timer if we are accumulating (and
> allow lateness)
>             if (allowedLateness > 0) {
>                ctx.registerEventTimeTimer(window.maxTimestamp() +
> allowedLateness);
>             }
>             return TriggerResult.FIRE;
>          } else {
>             return TriggerResult.FIRE_AND_PURGE;
>          }
>       } else if (time == window.maxTimestamp() + allowedLateness) {
>          return TriggerResult.PURGE;
>       }
>
>       return TriggerResult.CONTINUE;
>    }
>
>    @Override
>    public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
>       return TriggerResult.CONTINUE;
>    }
>
>    @Override
>    public String toString() {
>       return "EventTimeTrigger()";
>    }
>
>    /**
>     * Creates an event-time trigger that fires once the watermark passes
> the end of the window.
>     *
>     * <p>
>     * Once the trigger fires all elements are discarded. Elements that
> arrive late immediately
>     * trigger window evaluation with just this one element.
>     */
>    public static EventTimeTrigger discarding() {
>       return new EventTimeTrigger(false, 0L);
>    }
>
>    /**
>     * Creates an event-time trigger that fires once the watermark passes
> the end of the window.
>     *
>     * <p>
>     * This trigger will not immediately discard all elements once it
> fires. Only after the
>     * watermark passes the specified lateness are the window elements
> discarded, without
>     * emitting a new result. If a late element arrives within the
> specified lateness
>     * the window is computed again and a new result is emitted.
>     */
>    public static EventTimeTrigger accumulating(AbstractTime
> allowedLateness) {
>       return new EventTimeTrigger(true, allowedLateness.toMilliseconds());
>    }
> }
>
> You can specify a lateness and while that time is not yet reached the
> windows will remain and late arriving elements will trigger window emission
> with the complete window contents.
>
> Cheers,
> Aljoscha
> > On 13 Jan 2016, at 15:12, Andrew Coates <big.andy.coa...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > I'm trying to understand how the lifecycle of messages / state is
> managed by Flink, but I'm failing to find any documentation.
> >
> > Specially, if I'm using a windowed stream and a type of trigger that
> retain the elements of the window to allow for processing of late data e.g.
> ContinousEventTimeTrigger, then where are the contents of the windows, or
> their intermediate computation results, stored, and when is the data
> removed?
> >
> > I'm thinking in terms of Google's Dataflow API, setting a windows the
> withAllowedLateness option allows the caller to control how long past the
> end of a window the data should be maintained.  Does Flink have anything
> similar?
> >
> > Thanks,
> >
> > Andy
>
>

Reply via email to