Thanks you for the response.

        I would not mind the second scenario as in a second window, which
your illustration suggests with a custom trigger approach, I am not
certain  though that triggers  define the lifecycle of a window, as in a
trigger firing does not necessarily imply a Garbage Collectable Window.  It
should be GCed only after the watermark exceeds a hypothetically ever
increasing window leading boundary by a lag. In a some case that might
never happen as in the leading boundary is forever increasing. We may
decide to fire_and_purge. fire etc but the window remains live.  Or did I
get that part wrong ?


Vishal.




On Thu, Nov 9, 2017 at 8:24 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> It might be more complicated if you want to take into account events
> coming in out of order. For example you limit length of window to 5 and you
> get the following events:
>
> 1 2 3 4 6 7 8 5
>
> Do you want to emit windows:
>
> [1 2 3 4 5] (length limit exceeded) + [6 7 8] ?
>
> Or are you fine with interleaving windows in case of out of order:
>
> [1 2 3 4 6] + [5 7 8]
>
> If the latter one, some custom Trigger should be enough for you. If not,
> you would need to implement hypothetical MergingAndSplitableWindowAssigner,
> that after encountering late event “5” could split previously created
> windows. Unfortunately such feature is not supported by a WindowOperator,
> so you would have to implement your own operator for this.
>
> Regardless of your option remember to write some integration tests:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/
> testing.html#integration-testing
>
> Piotrek
>
> On 8 Nov 2017, at 21:43, Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
> I am implementing a bounded session window but I require to short circuit
> the session if the session length ( in count of events or time ) go beyond
> a configured limit , a very reasonable scenario ( bot etc ) . I am using
> the approach as listed. I am not sure though if the Window itself is being
> terminated and if that is even feasible. Any other approach or advise ?
>
> public class BoundedEventTimeTrigger extends Trigger<Object, TimeWindow> {
>     private static final long serialVersionUID = 1L;
>     long maxSessionTime;
>
>     ValueState<Boolean> doneState;
>     private final ValueStateDescriptor<Boolean> cleanupStateDescriptor =
>             new ValueStateDescriptor<>("done", Boolean.class );
>
>     private BoundedEventTimeTrigger(long maxSessionTime) {
>         this.maxSessionTime = maxSessionTime;
>     }
>
>     /**
>      * Creates an event-time trigger that fires once the watermark passes the 
> end of the window.
>      * <p>
>      * <p>Once the trigger fires all elements are discarded. Elements that 
> arrive late immediately
>      * trigger window evaluation with just this one element.
>      */
>     public static BoundedEventTimeTrigger create(long maxSessionLengh) {
>         return new BoundedEventTimeTrigger(maxSessionLengh);
>     }
>
>     @Override
>     public TriggerResult onElement(Object element, long timestamp, TimeWindow 
> window, TriggerContext ctx) throws Exception {
>         if(cleanupState!=null && cleanupState.value()!=null && 
> cleanupState.value()) {
>             return TriggerResult.CONTINUE;
>         }
>         if(timestamp - window.getStart() > maxSessionTime){
>             System.out.println(new Date(timestamp) + "\t" + new 
> Date(window.getStart()));
>             try {
>                 doneState = ctx.getPartitionedState(cleanupStateDescriptor);
>                 doneState.update(true);
>                 return TriggerResult.FIRE_AND_PURGE;
>             } catch (IOException e) {
>                 throw new RuntimeException("Failed to update state", e);
>             }
>         }
>
>         if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) {
>             // if the watermark is already past the window fire immediately
>             return TriggerResult.FIRE;
>         } else {
>             ctx.registerEventTimeTimer(window.maxTimestamp());
>             return TriggerResult.CONTINUE;
>         }
>     }
>
>     @Override
>     public TriggerResult onEventTime(long time, TimeWindow window, 
> TriggerContext ctx) {
>         return time == window.maxTimestamp() ?
>                 TriggerResult.FIRE :
>                 TriggerResult.CONTINUE;
>     }
>
>     @Override
>     public TriggerResult onProcessingTime(long time, TimeWindow window, 
> TriggerContext ctx) throws Exception {
>         return TriggerResult.CONTINUE;
>     }
>
>     @Override
>     public void clear(TimeWindow window, TriggerContext ctx) throws Exception 
> {
>         ctx.deleteEventTimeTimer(window.maxTimestamp());
>     }
>
>     @Override
>     public boolean canMerge() {
>         return true;
>     }
>
>     @Override
>     public void onMerge(TimeWindow window,
>                         OnMergeContext ctx) {
>         ctx.registerEventTimeTimer(window.maxTimestamp());
>     }
>
>     @Override
>     public String toString() {
>         return "EventTimeTrigger()";
>     }
>
> }
>
>
>
>

Reply via email to