the window contents are stored in state managed by the window operator at all 
times until they are purged by a Trigger returning PURGE from one of its on*() 

Out of the box, Flink does not have something akin to the lateness and cleanup 
of Google Dataflow. You can, however implement it yourself using a custom 
Trigger. This is an example that mimics Google Dataflow:

public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
   private static final long serialVersionUID = 1L;

   private final boolean accumulating;
   private final long allowedLateness;

   private EventTimeTrigger(boolean accumulating, long allowedLateness) {
      this.accumulating = accumulating;
      this.allowedLateness = allowedLateness;

   public TriggerResult onElement(Object element, long timestamp, TimeWindow 
window, TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;

   public TriggerResult onEventTime(long time, TimeWindow window, 
TriggerContext ctx) {
      if (time == window.maxTimestamp()) {
         if (accumulating) {
            // register the cleanup timer if we are accumulating (and allow 
            if (allowedLateness > 0) {
               ctx.registerEventTimeTimer(window.maxTimestamp() + 
            return TriggerResult.FIRE;
         } else {
            return TriggerResult.FIRE_AND_PURGE;
      } else if (time == window.maxTimestamp() + allowedLateness) {
         return TriggerResult.PURGE;

      return TriggerResult.CONTINUE;

   public TriggerResult onProcessingTime(long time, TimeWindow window, 
TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;

   public String toString() {
      return "EventTimeTrigger()";

    * Creates an event-time trigger that fires once the watermark passes the 
end of the window.
    * <p>
    * Once the trigger fires all elements are discarded. Elements that arrive 
late immediately
    * trigger window evaluation with just this one element.
   public static EventTimeTrigger discarding() {
      return new EventTimeTrigger(false, 0L);

    * Creates an event-time trigger that fires once the watermark passes the 
end of the window.
    * <p>
    * This trigger will not immediately discard all elements once it fires. 
Only after the
    * watermark passes the specified lateness are the window elements 
discarded, without
    * emitting a new result. If a late element arrives within the specified 
    * the window is computed again and a new result is emitted.
   public static EventTimeTrigger accumulating(AbstractTime allowedLateness) {
      return new EventTimeTrigger(true, allowedLateness.toMilliseconds());

You can specify a lateness and while that time is not yet reached the windows 
will remain and late arriving elements will trigger window emission with the 
complete window contents.

> On 13 Jan 2016, at 15:12, Andrew Coates <big.andy.coa...@gmail.com> wrote:
> Hi, 
> I'm trying to understand how the lifecycle of messages / state is managed by 
> Flink, but I'm failing to find any documentation.
> Specially, if I'm using a windowed stream and a type of trigger that retain 
> the elements of the window to allow for processing of late data e.g. 
> ContinousEventTimeTrigger, then where are the contents of the windows, or 
> their intermediate computation results, stored, and when is the data removed?
> I'm thinking in terms of Google's Dataflow API, setting a windows the 
> withAllowedLateness option allows the caller to control how long past the end 
> of a window the data should be maintained.  Does Flink have anything similar?
> Thanks,
> Andy

Reply via email to