Hi,
the window contents are stored in state managed by the window operator at all 
times until they are purged by a Trigger returning PURGE from one of its on*() 
methods.

Out of the box, Flink does not have something akin to the lateness and cleanup 
of Google Dataflow. You can, however implement it yourself using a custom 
Trigger. This is an example that mimics Google Dataflow:

public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
   private static final long serialVersionUID = 1L;

   private final boolean accumulating;
   private final long allowedLateness;

   private EventTimeTrigger(boolean accumulating, long allowedLateness) {
      this.accumulating = accumulating;
      this.allowedLateness = allowedLateness;
   }

   @Override
   public TriggerResult onElement(Object element, long timestamp, TimeWindow 
window, TriggerContext ctx) throws Exception {
      ctx.registerEventTimeTimer(window.maxTimestamp());
      return TriggerResult.CONTINUE;
   }

   @Override
   public TriggerResult onEventTime(long time, TimeWindow window, 
TriggerContext ctx) {
      if (time == window.maxTimestamp()) {
         if (accumulating) {
            // register the cleanup timer if we are accumulating (and allow 
lateness)
            if (allowedLateness > 0) {
               ctx.registerEventTimeTimer(window.maxTimestamp() + 
allowedLateness);
            }
            return TriggerResult.FIRE;
         } else {
            return TriggerResult.FIRE_AND_PURGE;
         }
      } else if (time == window.maxTimestamp() + allowedLateness) {
         return TriggerResult.PURGE;
      }

      return TriggerResult.CONTINUE;
   }

   @Override
   public TriggerResult onProcessingTime(long time, TimeWindow window, 
TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
   }

   @Override
   public String toString() {
      return "EventTimeTrigger()";
   }

   /**
    * Creates an event-time trigger that fires once the watermark passes the 
end of the window.
    *
    * <p>
    * Once the trigger fires all elements are discarded. Elements that arrive 
late immediately
    * trigger window evaluation with just this one element.
    */
   public static EventTimeTrigger discarding() {
      return new EventTimeTrigger(false, 0L);
   }

   /**
    * Creates an event-time trigger that fires once the watermark passes the 
end of the window.
    *
    * <p>
    * This trigger will not immediately discard all elements once it fires. 
Only after the
    * watermark passes the specified lateness are the window elements 
discarded, without
    * emitting a new result. If a late element arrives within the specified 
lateness
    * the window is computed again and a new result is emitted.
    */
   public static EventTimeTrigger accumulating(AbstractTime allowedLateness) {
      return new EventTimeTrigger(true, allowedLateness.toMilliseconds());
   }
}

You can specify a lateness and while that time is not yet reached the windows 
will remain and late arriving elements will trigger window emission with the 
complete window contents.

Cheers,
Aljoscha
> On 13 Jan 2016, at 15:12, Andrew Coates <big.andy.coa...@gmail.com> wrote:
> 
> Hi, 
> 
> I'm trying to understand how the lifecycle of messages / state is managed by 
> Flink, but I'm failing to find any documentation.
> 
> Specially, if I'm using a windowed stream and a type of trigger that retain 
> the elements of the window to allow for processing of late data e.g. 
> ContinousEventTimeTrigger, then where are the contents of the windows, or 
> their intermediate computation results, stored, and when is the data removed?
> 
> I'm thinking in terms of Google's Dataflow API, setting a windows the 
> withAllowedLateness option allows the caller to control how long past the end 
> of a window the data should be maintained.  Does Flink have anything similar?
> 
> Thanks,
> 
> Andy

Reply via email to