For example, this would have worked perfect if it did not complain about MergeableWindow and state. The Session class in this encapsulates the trim up to watermark behavior ( reduce call after telling it the current WM ) we desire
public class SessionProcessWindow extends ProcessWindowFunction<Event, Session, String, TimeWindow> { private static final ValueStateDescriptor<Session> sessionState = new ValueStateDescriptor<>("session", Session.class); @Override public void process(String key, Context context, Iterable<Event> elements, Collector<Session> out) throws Exception { ValueState<Session> session = context.windowState().getState(sessionState); Session s = session.value() != null ? session.value() : new Session(); for (Event e : elements) { s.add(e); } s.lastWaterMarkedEventLite.serverTime = context.currentWatermark(); s.reduce(); out.collect(s); session.update(s); } @Override public void clear(Context context){ ValueState<Session> session = context.windowState().getState(sessionState); session.clear(); } } On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <vishal.santo...@gmail.com > wrote: > Hello Fabian, Thank you for the response. > > I think that does not work, as it is the WM of the Window Operator is > what is desired to make deterministic decisions rather than off an operator > the precedes the Window ? This is doable using ProcessWindowFunction using > state but only in the case of non mergeable windows. > > The best API option I think is a TimeBaseTrigger that fires every > configured time progression of WM and a Window implementation that > materializes *only data up till that WM* ( it might have more data but > that data has event time grater than the WM ). I am not sure we have that > built in option and thus was asking for an access the current WM for the > window operator to allow us handle the "*only data up till that WM" *range > retrieval using some custom data structure. > > Regards. > > > > > On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Vishal, >> >> the Trigger is not designed to augment records but just to control when a >> window is evaluated. >> I would recommend to use a ProcessFunction to enrich records with the >> current watermark before passing them into the window operator. >> The context object of the processElement() method gives access to the >> current watermark and timestamp of a record. >> >> Please note that watermarks are not deterministic but may depend on the >> order in which parallel inputs are consumed by an operator. >> >> Best, Fabian >> >> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >> >>> An addendum >>> >>> Is the element reference IN in onElement(IN element.. ) in >>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in >>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is >>> not visible to the Accumulator that is carrying it as a previous element >>> reference albeit in the next invocation of add(). This seems to be only in >>> distributed mode, which makes sense only if theses reference point to >>> different objects. >>> >>> The pipeline >>> >>> .keyBy(keySelector) >>> .window(EventTimeSessionWindows.<IN>withGap(gap)) >>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount)) >>> .aggregate( >>> new AggregateFunction<IN, ACC, OUT>() { >>> >>> @Override >>> public ACC createAccumulator() { >>> ACC newInstance = (ACC) accumulator.clone(); >>> newInstance.resetLocal(); >>> return newInstance; >>> } >>> >>> @Override >>> public void add(IN value, ACC accumulator) { >>> >>> /** This method is called before onElement of the Trigger >>> and keeps the reference to the last IN **/ >>> >>> >>> accumulator.add(value); >>> >>> } >>> ..... >>> >>> The Trigger >>> >>> >>> public class CountBasedWMAugmentationTrigger<T extends >>> Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W >>> extends Window> extends Trigger<T, W> { >>> >>> >>> @Override >>> >>> public TriggerResult onElement(T element, long timestamp, W window, >>> TriggerContext ctx) throws Exception { >>> >>> /** The element T is mutated to carry the watermark **/ >>> *element.setWaterMark(ctx.getCurrentWatermark());* >>> >>> . >>> >>> >>> >>> >>> >>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> I want to augment a POJO in Trigger's onElement method, specifically >>>> supply the POJO with the watermark from the TriggerContext. The sequence of >>>> execution is this sequence >>>> >>>> 1. call to add() in the accumulator for the window and save the POJO >>>> reference in the Accumulator. >>>> 2. call to onElement on Tigger >>>> 3. set watermark to the POJO >>>> >>>> The next add() method should have the last reference and any mutation >>>> done in step 3. >>>> >>>> That works in a local test case, using LocalFlinkMiniCluster, as in I >>>> have access to the mutation by the onElement() in the POJO in the >>>> subsequent add(), but not on a distributed cluster. The specific question >>>> I had is whether add() on a supplied accumulator on a window and >>>> onElement() method of the trigger on that window are inline executions, on >>>> the same thread or is there any serialization/deserialization IPC that >>>> causes these divergence ( local versus distributed ) >>>> >>>> Regards. >>>> >>> >>> >> >