Thanks you for the response. I would not mind the second scenario as in a second window, which your illustration suggests with a custom trigger approach, I am not certain though that triggers define the lifecycle of a window, as in a trigger firing does not necessarily imply a Garbage Collectable Window. It should be GCed only after the watermark exceeds a hypothetically ever increasing window leading boundary by a lag. In a some case that might never happen as in the leading boundary is forever increasing. We may decide to fire_and_purge. fire etc but the window remains live. Or did I get that part wrong ?
Vishal. On Thu, Nov 9, 2017 at 8:24 AM, Piotr Nowojski <pi...@data-artisans.com> wrote: > It might be more complicated if you want to take into account events > coming in out of order. For example you limit length of window to 5 and you > get the following events: > > 1 2 3 4 6 7 8 5 > > Do you want to emit windows: > > [1 2 3 4 5] (length limit exceeded) + [6 7 8] ? > > Or are you fine with interleaving windows in case of out of order: > > [1 2 3 4 6] + [5 7 8] > > If the latter one, some custom Trigger should be enough for you. If not, > you would need to implement hypothetical MergingAndSplitableWindowAssigner, > that after encountering late event “5” could split previously created > windows. Unfortunately such feature is not supported by a WindowOperator, > so you would have to implement your own operator for this. > > Regardless of your option remember to write some integration tests: > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/ > testing.html#integration-testing > > Piotrek > > On 8 Nov 2017, at 21:43, Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > > I am implementing a bounded session window but I require to short circuit > the session if the session length ( in count of events or time ) go beyond > a configured limit , a very reasonable scenario ( bot etc ) . I am using > the approach as listed. I am not sure though if the Window itself is being > terminated and if that is even feasible. Any other approach or advise ? > > public class BoundedEventTimeTrigger extends Trigger<Object, TimeWindow> { > private static final long serialVersionUID = 1L; > long maxSessionTime; > > ValueState<Boolean> doneState; > private final ValueStateDescriptor<Boolean> cleanupStateDescriptor = > new ValueStateDescriptor<>("done", Boolean.class ); > > private BoundedEventTimeTrigger(long maxSessionTime) { > this.maxSessionTime = maxSessionTime; > } > > /** > * Creates an event-time trigger that fires once the watermark passes the > end of the window. > * <p> > * <p>Once the trigger fires all elements are discarded. Elements that > arrive late immediately > * trigger window evaluation with just this one element. > */ > public static BoundedEventTimeTrigger create(long maxSessionLengh) { > return new BoundedEventTimeTrigger(maxSessionLengh); > } > > @Override > public TriggerResult onElement(Object element, long timestamp, TimeWindow > window, TriggerContext ctx) throws Exception { > if(cleanupState!=null && cleanupState.value()!=null && > cleanupState.value()) { > return TriggerResult.CONTINUE; > } > if(timestamp - window.getStart() > maxSessionTime){ > System.out.println(new Date(timestamp) + "\t" + new > Date(window.getStart())); > try { > doneState = ctx.getPartitionedState(cleanupStateDescriptor); > doneState.update(true); > return TriggerResult.FIRE_AND_PURGE; > } catch (IOException e) { > throw new RuntimeException("Failed to update state", e); > } > } > > if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) { > // if the watermark is already past the window fire immediately > return TriggerResult.FIRE; > } else { > ctx.registerEventTimeTimer(window.maxTimestamp()); > return TriggerResult.CONTINUE; > } > } > > @Override > public TriggerResult onEventTime(long time, TimeWindow window, > TriggerContext ctx) { > return time == window.maxTimestamp() ? > TriggerResult.FIRE : > TriggerResult.CONTINUE; > } > > @Override > public TriggerResult onProcessingTime(long time, TimeWindow window, > TriggerContext ctx) throws Exception { > return TriggerResult.CONTINUE; > } > > @Override > public void clear(TimeWindow window, TriggerContext ctx) throws Exception > { > ctx.deleteEventTimeTimer(window.maxTimestamp()); > } > > @Override > public boolean canMerge() { > return true; > } > > @Override > public void onMerge(TimeWindow window, > OnMergeContext ctx) { > ctx.registerEventTimeTimer(window.maxTimestamp()); > } > > @Override > public String toString() { > return "EventTimeTrigger()"; > } > > } > > > >