Hello Fabian, Thank you for the response. I think that does not work, as it is the WM of the Window Operator is what is desired to make deterministic decisions rather than off an operator the precedes the Window ? This is doable using ProcessWindowFunction using state but only in the case of non mergeable windows.
The best API option I think is a TimeBaseTrigger that fires every configured time progression of WM and a Window implementation that materializes *only data up till that WM* ( it might have more data but that data has event time grater than the WM ). I am not sure we have that built in option and thus was asking for an access the current WM for the window operator to allow us handle the "*only data up till that WM" *range retrieval using some custom data structure. Regards. On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Vishal, > > the Trigger is not designed to augment records but just to control when a > window is evaluated. > I would recommend to use a ProcessFunction to enrich records with the > current watermark before passing them into the window operator. > The context object of the processElement() method gives access to the > current watermark and timestamp of a record. > > Please note that watermarks are not deterministic but may depend on the > order in which parallel inputs are consumed by an operator. > > Best, Fabian > > 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > >> An addendum >> >> Is the element reference IN in onElement(IN element.. ) in >> Trigger<IN,..>, the same as IN the one provided to add(IN value) in >> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is >> not visible to the Accumulator that is carrying it as a previous element >> reference albeit in the next invocation of add(). This seems to be only in >> distributed mode, which makes sense only if theses reference point to >> different objects. >> >> The pipeline >> >> .keyBy(keySelector) >> .window(EventTimeSessionWindows.<IN>withGap(gap)) >> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount)) >> .aggregate( >> new AggregateFunction<IN, ACC, OUT>() { >> >> @Override >> public ACC createAccumulator() { >> ACC newInstance = (ACC) accumulator.clone(); >> newInstance.resetLocal(); >> return newInstance; >> } >> >> @Override >> public void add(IN value, ACC accumulator) { >> >> /** This method is called before onElement of the Trigger >> and keeps the reference to the last IN **/ >> >> >> accumulator.add(value); >> >> } >> ..... >> >> The Trigger >> >> >> public class CountBasedWMAugmentationTrigger<T extends >> Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W >> extends Window> extends Trigger<T, W> { >> >> >> @Override >> >> public TriggerResult onElement(T element, long timestamp, W window, >> TriggerContext ctx) throws Exception { >> >> /** The element T is mutated to carry the watermark **/ >> *element.setWaterMark(ctx.getCurrentWatermark());* >> >> . >> >> >> >> >> >> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> I want to augment a POJO in Trigger's onElement method, specifically >>> supply the POJO with the watermark from the TriggerContext. The sequence of >>> execution is this sequence >>> >>> 1. call to add() in the accumulator for the window and save the POJO >>> reference in the Accumulator. >>> 2. call to onElement on Tigger >>> 3. set watermark to the POJO >>> >>> The next add() method should have the last reference and any mutation >>> done in step 3. >>> >>> That works in a local test case, using LocalFlinkMiniCluster, as in I >>> have access to the mutation by the onElement() in the POJO in the >>> subsequent add(), but not on a distributed cluster. The specific question >>> I had is whether add() on a supplied accumulator on a window and >>> onElement() method of the trigger on that window are inline executions, on >>> the same thread or is there any serialization/deserialization IPC that >>> causes these divergence ( local versus distributed ) >>> >>> Regards. >>> >> >> >