I guess https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362
is where We could fashion as to what is emitted. Again for us it seems natural to use WM to materialize a micro batches with "approximate" order ( and no I am not a fan of spark micro batches :)). Any pointers as to how we could write an implementation that allows for "up till WM emission" through a trigger on a Session Window would be very helpful. In essence I believe that for any "funnel" analysis it is crucial. Something like https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java#L346 I know I am simplifying this and there has to be more to it... On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <vishal.santo...@gmail.com > wrote: > The Trigger in this case would be some CountBased Trigger.... Again the > motive is the keep the state lean as we desire to search for patterns, > sorted on even time, in the incoming sessionized ( and thus of un > deterministic length ) stream.... > > On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> For example, this would have worked perfect if it did not complain about >> MergeableWindow and state. The Session class in this encapsulates the trim >> up to watermark behavior ( reduce call after telling it the current WM ) >> we desire >> >> public class SessionProcessWindow extends ProcessWindowFunction<Event, >> Session, String, TimeWindow> { >> >> private static final ValueStateDescriptor<Session> sessionState = new >> ValueStateDescriptor<>("session", Session.class); >> >> @Override >> public void process(String key, Context context, Iterable<Event> >> elements, Collector<Session> out) throws Exception { >> >> ValueState<Session> session = >> context.windowState().getState(sessionState); >> Session s = session.value() != null ? session.value() : new >> Session(); >> for (Event e : elements) { >> s.add(e); >> } >> s.lastWaterMarkedEventLite.serverTime = context.currentWatermark(); >> s.reduce(); >> out.collect(s); >> session.update(s); >> } >> >> @Override >> public void clear(Context context){ >> ValueState<Session> session = >> context.windowState().getState(sessionState); >> session.clear(); >> } >> } >> >> >> >> >> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Hello Fabian, Thank you for the response. >>> >>> I think that does not work, as it is the WM of the Window Operator is >>> what is desired to make deterministic decisions rather than off an operator >>> the precedes the Window ? This is doable using ProcessWindowFunction using >>> state but only in the case of non mergeable windows. >>> >>> The best API option I think is a TimeBaseTrigger that fires every >>> configured time progression of WM and a Window implementation that >>> materializes *only data up till that WM* ( it might have more data but >>> that data has event time grater than the WM ). I am not sure we have that >>> built in option and thus was asking for an access the current WM for the >>> window operator to allow us handle the "*only data up till that WM" *range >>> retrieval using some custom data structure. >>> >>> Regards. >>> >>> >>> >>> >>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Vishal, >>>> >>>> the Trigger is not designed to augment records but just to control when >>>> a window is evaluated. >>>> I would recommend to use a ProcessFunction to enrich records with the >>>> current watermark before passing them into the window operator. >>>> The context object of the processElement() method gives access to the >>>> current watermark and timestamp of a record. >>>> >>>> Please note that watermarks are not deterministic but may depend on the >>>> order in which parallel inputs are consumed by an operator. >>>> >>>> Best, Fabian >>>> >>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>>> >>>>> An addendum >>>>> >>>>> Is the element reference IN in onElement(IN element.. ) in >>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in >>>>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() >>>>> is >>>>> not visible to the Accumulator that is carrying it as a previous element >>>>> reference albeit in the next invocation of add(). This seems to be only in >>>>> distributed mode, which makes sense only if theses reference point to >>>>> different objects. >>>>> >>>>> The pipeline >>>>> >>>>> .keyBy(keySelector) >>>>> .window(EventTimeSessionWindows.<IN>withGap(gap)) >>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount)) >>>>> .aggregate( >>>>> new AggregateFunction<IN, ACC, OUT>() { >>>>> >>>>> @Override >>>>> public ACC createAccumulator() { >>>>> ACC newInstance = (ACC) accumulator.clone(); >>>>> newInstance.resetLocal(); >>>>> return newInstance; >>>>> } >>>>> >>>>> @Override >>>>> public void add(IN value, ACC accumulator) { >>>>> >>>>> /** This method is called before onElement of the Trigger >>>>> and keeps the reference to the last IN **/ >>>>> >>>>> >>>>> accumulator.add(value); >>>>> >>>>> } >>>>> ..... >>>>> >>>>> The Trigger >>>>> >>>>> >>>>> public class CountBasedWMAugmentationTrigger<T extends >>>>> Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W >>>>> extends Window> extends Trigger<T, W> { >>>>> >>>>> >>>>> @Override >>>>> >>>>> public TriggerResult onElement(T element, long timestamp, W window, >>>>> TriggerContext ctx) throws Exception { >>>>> >>>>> /** The element T is mutated to carry the watermark **/ >>>>> *element.setWaterMark(ctx.getCurrentWatermark());* >>>>> >>>>> . >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> I want to augment a POJO in Trigger's onElement method, specifically >>>>>> supply the POJO with the watermark from the TriggerContext. The sequence >>>>>> of >>>>>> execution is this sequence >>>>>> >>>>>> 1. call to add() in the accumulator for the window and save the >>>>>> POJO reference in the Accumulator. >>>>>> 2. call to onElement on Tigger >>>>>> 3. set watermark to the POJO >>>>>> >>>>>> The next add() method should have the last reference and any mutation >>>>>> done in step 3. >>>>>> >>>>>> That works in a local test case, using LocalFlinkMiniCluster, as in I >>>>>> have access to the mutation by the onElement() in the POJO in the >>>>>> subsequent add(), but not on a distributed cluster. The specific >>>>>> question >>>>>> I had is whether add() on a supplied accumulator on a window and >>>>>> onElement() method of the trigger on that window are inline executions, >>>>>> on >>>>>> the same thread or is there any serialization/deserialization IPC that >>>>>> causes these divergence ( local versus distributed ) >>>>>> >>>>>> Regards. >>>>>> >>>>> >>>>> >>>> >>> >> >