I am implementing a bounded session window but I require to short circuit
the session if the session length ( in count of events or time ) go beyond
a configured limit , a very reasonable scenario ( bot etc ) . I am using
the approach as listed. I am not sure though if the Window itself is being
terminated and if that is even feasible. Any other approach or advise ?

public class BoundedEventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;
    long maxSessionTime;

    ValueState<Boolean> doneState;
    private final ValueStateDescriptor<Boolean> cleanupStateDescriptor =
            new ValueStateDescriptor<>("done", Boolean.class );

    private BoundedEventTimeTrigger(long maxSessionTime) {
        this.maxSessionTime = maxSessionTime;
    }

    /**
     * Creates an event-time trigger that fires once the watermark
passes the end of the window.
     * <p>
     * <p>Once the trigger fires all elements are discarded. Elements
that arrive late immediately
     * trigger window evaluation with just this one element.
     */
    public static BoundedEventTimeTrigger create(long maxSessionLengh) {
        return new BoundedEventTimeTrigger(maxSessionLengh);
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
        if(cleanupState!=null && cleanupState.value()!=null &&
cleanupState.value()) {
            return TriggerResult.CONTINUE;
        }
        if(timestamp - window.getStart() > maxSessionTime){
            System.out.println(new Date(timestamp) + "\t" + new
Date(window.getStart()));
            try {
                doneState = ctx.getPartitionedState(cleanupStateDescriptor);
                doneState.update(true);
                return TriggerResult.FIRE_AND_PURGE;
            } catch (IOException e) {
                throw new RuntimeException("Failed to update state", e);
            }
        }

        if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
        return time == window.maxTimestamp() ?
                TriggerResult.FIRE :
                TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow
window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
        ctx.registerEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public String toString() {
        return "EventTimeTrigger()";
    }

}

Reply via email to