Hi, you would not need the ListStateDescriptor. A WindowProcessFunction stores all events that are assigned to a window (IN objects in your case) in an internal ListState. The Iterable<IN> parameter of the process() method iterates over the internal list state.
So you would have a Trigger that fires when a new watermark is received (or in regular intervals like every minute) and at the end of the window. The process() method looks up the current watermark in the Context object, traverses the Iterable<IN> filtering out all events with timestamp > watermark (you would need to enrich the events with the timestamp which can be done in a ProcessFunction), inserting the remaining ones into a sorted data structure (possibly leveraging the almost sorted nature of the events) and create a Session from it. This is probably less efficient than your ProcessFunction because process() would go over the complete list over and over again and not be able to persist the result of previous invocations. However, the code should be easier to maintain. Does that make sense? Best, Fabian 2018-01-05 17:28 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > Hello Fabian, Thank you for your response. > > I thought about it and may be am missing something > obvious here. The code below is what I think you suggest. The issue is that > the window now is a list of Session's ( or shall subsets of the Session). > > What is required is that on a new watermark > > * We sort these Session objects > * Get the subset that are before the new Watermark and an emit without > purge. > > I do not see how the Trigger approach helps us. It does tell us that the > watermark has progressed but to get a subset of the ListState that falls > before the watermark, we would need access to *the new value of the > watermark*. That was what my initial query was. > > > > public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends > SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, > TimeWindow> { > > > OUT toCreateNew; > Long gap; > private final ListStateDescriptor< OUT> mergingSetsStateDescriptor; > > public SessionProcessWindow(TypeInformation<OUT> aggregationResultType, > OUT toCreateNew) { > this.toCreateNew = toCreateNew; > mergingSetsStateDescriptor = > new ListStateDescriptor<>("sessions", aggregationResultType); > } > @Override > public void process(String s, Context context, Iterable<IN> elements, > Collector<OUT> out) throws Exception { > OUT session = toCreateNew.createNew(); > elements.forEach(f -> session.add(f)); > > context.windowState().getListState(mergingSetsStateDescriptor).add(session); > } > } > > > On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Vishal, >> >> thanks for sharing your solution! >> >> Looking at this issue again and your mail in which you shared your >> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the >> ValueState that prevents the ProcessWindowFunction to be used in a >> mergeable window. >> You could have created a new Session object in each invocation of the >> ProcessWindowFucntion and simply keep the elements in the (mergable) list >> state of the window. >> In that case you would simply need a custom trigger that calls the >> ProcessWindowFunction when a new watermark arrives. For intermediate calls, >> you just FIRE and for the final call you FIRE_AND_PURGE to remove the >> elements from the window's state. >> Did you try that? >> >> Best, Fabian >> >> >> >> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >> >>> Dear Fabian, >>> >>> I was able to create a pretty functional ProcessFunction and >>> here is the synopsis and please see if it makes sense. >>> >>> Sessionization is unique as in it entails windows of dynamic length. The >>> way flink approaches is pretty simple. It will create a TimeWindow of size >>> "gap" relative to the event time, find an overlapping window ( intersection >>> ) and create a covering window. Each such window has a "state" associated >>> with it, which too has to be merged when a cover window is created on >>> intersection of 2 or more incident windows.To be more precise if >>> Window1 spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and >>> t1<=t3<=t2 a new Window is created ( t1, t4 ) and the associated >>> states are merged. >>> >>> >>> In the current Window API the states are external and are >>> Accumulator based. This approach pretty much works for all cases where >>> the aggregation is accumulative/reduced and does not depend on order, >>> as in no order list of incoming records needs to be kept and reduction is >>> to a single aggregated element ( think counts, min max etc). In path >>> analysis ( and other use cases ) however this approach has drawbacks. Even >>> though in our accumulator we could keep an ordered list of events it >>> becomes unreasonable if not within bounds. An approach that does >>> *attempt* to bind state, is to preemptively analyze paths using the WM >>> as the marker that defines the *subset* of the state that is safe to >>> analyze. So if we have n events in the window state and m fall before WM, >>> we can safely analyze the m subset, emitting paths seen and reducing the >>> cumulative state size. There are caveats though that I will go into later. >>> >>> >>> >>> Unfortunately the Accumulators in Flink Window runtime defaults do not >>> have access to the WM. >>> >>> >>> This lead to this generic approach ( implemented and tested ) >>> >>> >>> * Use a low level ProcessFunction that allows access to WM and >>> definitely nearer to the guts of Flink. >>> >>> >>> * Still use the merge Windows on intersection approach but use WM to >>> trigger ( through Timers) reductions in state. This is not very >>> dissimilar to what Flink does but we have more control over what to do and >>> when to do it. Essentially have exposed a lifecycle method that reacts >>> to WM progression. >>> >>> >>> * There are essentially 2 Timers. The first timer is the maxTimeStamp() >>> of a Window, which if there is no further mutation b'coz of merge etc will >>> fire to reflect a Session End. The second one is on currentWaterMark+1 >>> that essentially calls a "reduceToWM" on each keyed Window and thus State. >>> >>> >>> * There are 2 ways to short circuit a Session 1. On Session time span 2. >>> On Session size. >>> >>> >>> * There is a safety valve to blacklist keys when it is obvious that it >>> is a bot ( again >>> >>> >>> The solution will thus preemptively push out Patterns ( and correct >>> patterns ) while keeping the ordered state within reasonable bounds. The >>> incident data of course has to be analyzed . Are the paths to large etc. >>> But one has full control over how to fashion the solution. >>> >>> >>> >>> >>> Regards and Thanks, >>> >>> >>> Vishal >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> This makes sense. Thanks. >>>> >>>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> all calls to onElement() or onTimer() are syncronized for any keys. >>>>> Think of a single thread calling these methods. >>>>> Event-time timers are called when a watermark passes the timer. >>>>> Watermarks are received as special records, so the methods are called in >>>>> the same order as records (actual records or watermarks) arrive at the >>>>> function. Only for processing-time timers, actual synchronization is >>>>> required. >>>>> >>>>> The NPE might be thrown because of two timers that fire one after the >>>>> other without a new record being processed in between the onTimer() calls. >>>>> In that case the state is cleared in the first call and null in the >>>>> second. >>>>> >>>>> Best, Fabian >>>>> >>>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com> >>>>> : >>>>> >>>>>> Thanks. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> I have a few follow up questions regarding ProcessFunction. I >>>>>> think that the core should take care of any synchronization issues >>>>>> between >>>>>> calls to onElement and onTimer in case of a keyed stream but tests do not >>>>>> seem to suggest that. >>>>>> >>>>>> >>>>>> >>>>>> I have specifically 2 questions. >>>>>> >>>>>> >>>>>> 1. Are calls to onElement(..) single threaded if scoped to a key ? >>>>>> As in on a keyed stream, is there a way that 2 or more threads can >>>>>> execute on the more than one element of a single key at one time ? Would >>>>>> I >>>>>> have to synchronize this construction >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> *OUT accumulator = accumulatorState.value(); if (accumulator == >>>>>> null) { accumulator = acc.createNew(); }* >>>>>> >>>>>> >>>>>> >>>>>> 2. Can concurrent calls happen onTimer(..) and onElement(..) for >>>>>> the same key ? I intend to clean up state but I see NullPointers in >>>>>> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer >>>>>> are >>>>>> executed on 2 separate threads, with on Timer removing the state ( >>>>>> clear() ) but after another thread has registered a Timer ( in onElement >>>>>> ). >>>>>> >>>>>> >>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers >>>>>> on Race Conditions* >>>>>> accumulatorState.clear(); >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> PS. This is the full code. >>>>>> >>>>>> >>>>>> >>>>>> @Override >>>>>> public void processElement(IN event, Context context, Collector<OUT> >>>>>> out) throws Exception { >>>>>> TimerService timerService = context.timerService(); >>>>>> if (context.timestamp() > timerService.currentWatermark()) { >>>>>> OUT accumulator = accumulatorState.value(); >>>>>> if (accumulator == null) { >>>>>> accumulator = acc.createNew(); >>>>>> } >>>>>> accumulator.setLastModified(context.timestamp()); >>>>>> accumulatorState.update(accumulator); >>>>>> timerService.registerEventTimeTimer(context.timestamp() + gap); >>>>>> } >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void onTimer(long timestamp, OnTimerContext context, >>>>>> Collector<OUT> out) throws Exception { >>>>>> OUT accumulator = accumulatorState.value(); >>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // >>>>>> NullPointers on Race Conditions* >>>>>> accumulatorState.clear(); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> That's correct. Removal of timers is not supported in >>>>>>> ProcessFunction. Not sure why this is supported for Triggers. >>>>>>> The common workaround for ProcessFunctions is to register multiple >>>>>>> timers and have a ValueState that stores the valid timestamp on which >>>>>>> the >>>>>>> onTimer method should be executed. >>>>>>> When a timer fires and calls onTimer(), the method first checks >>>>>>> whether the timestamp is the correct one and leaves the method if that >>>>>>> is >>>>>>> not the case. >>>>>>> If you want to fire on the next watermark, another trick is to >>>>>>> register multiple timers on (currentWatermark + 1). Since there is only >>>>>>> one >>>>>>> timer per timestamp, there is only one timer which gets continuously >>>>>>> overwritten. The timer is called when the watermark is advanced. >>>>>>> >>>>>>> On the performance of the timer service. AFAIK, all methods that >>>>>>> work with some kind of timer use this service. So there is not much >>>>>>> choice. >>>>>>> >>>>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com>: >>>>>>> >>>>>>>> And that further begs the question.. how performant is Timer >>>>>>>> Service. I tried to peruse through the architecture behind it but cold >>>>>>>> not >>>>>>>> find a definite clue. Is it a Scheduled Service and if yes how many >>>>>>>> threads >>>>>>>> etc... >>>>>>>> >>>>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The >>>>>>>>> TimeService exposed by the Context does not have remove timer. Is it >>>>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a >>>>>>>>> PriorityQueue is expensive ? Trigger Context does expose another >>>>>>>>> version >>>>>>>>> that has removal abilities so was wondering why this dissonance... >>>>>>>>> >>>>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Vishal, >>>>>>>>>> >>>>>>>>>> it is not guaranteed that add() and onElement() receive the same >>>>>>>>>> object, and even if they do it is not guaranteed that a mutation of >>>>>>>>>> the >>>>>>>>>> object in onElement() has an effect. The object might have been >>>>>>>>>> serialized >>>>>>>>>> and stored in RocksDB. >>>>>>>>>> Hence, elements should not be modified in onElement(). >>>>>>>>>> >>>>>>>>>> Have you considered to implement the operation completely in a >>>>>>>>>> ProcessFunction instead of a session window? >>>>>>>>>> This might be more code but easier to design and reason about >>>>>>>>>> because there is no interaction of window assigner, trigger, and >>>>>>>>>> window >>>>>>>>>> function. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>> >>>>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9 >>>>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o >>>>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window >>>>>>>>>>> Operator.java#L362 >>>>>>>>>>> >>>>>>>>>>> is where We could fashion as to what is emitted. Again for us it >>>>>>>>>>> seems natural to use WM to materialize a micro batches with >>>>>>>>>>> "approximate" >>>>>>>>>>> order ( and no I am not a fan of spark micro batches :)). Any >>>>>>>>>>> pointers as >>>>>>>>>>> to how we could write an implementation that allows for "up till WM >>>>>>>>>>> emission" through a trigger on a Session Window would be very >>>>>>>>>>> helpful. In >>>>>>>>>>> essence I believe that for any "funnel" analysis it is crucial. >>>>>>>>>>> >>>>>>>>>>> Something like https://github.com/apache >>>>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s >>>>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti >>>>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346 >>>>>>>>>>> >>>>>>>>>>> I know I am simplifying this and there has to be more to it... >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> The Trigger in this case would be some CountBased Trigger.... >>>>>>>>>>>> Again the motive is the keep the state lean as we desire to search >>>>>>>>>>>> for >>>>>>>>>>>> patterns, sorted on even time, in the incoming sessionized ( and >>>>>>>>>>>> thus of >>>>>>>>>>>> un deterministic length ) stream.... >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi < >>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> For example, this would have worked perfect if it did not >>>>>>>>>>>>> complain about MergeableWindow and state. The Session class in >>>>>>>>>>>>> this >>>>>>>>>>>>> encapsulates the trim up to watermark behavior ( reduce call >>>>>>>>>>>>> after telling >>>>>>>>>>>>> it the current WM ) we desire >>>>>>>>>>>>> >>>>>>>>>>>>> public class SessionProcessWindow extends >>>>>>>>>>>>> ProcessWindowFunction<Event, Session, String, TimeWindow> { >>>>>>>>>>>>> >>>>>>>>>>>>> private static final ValueStateDescriptor<Session> >>>>>>>>>>>>> sessionState = new ValueStateDescriptor<>("session", >>>>>>>>>>>>> Session.class); >>>>>>>>>>>>> >>>>>>>>>>>>> @Override >>>>>>>>>>>>> public void process(String key, Context context, >>>>>>>>>>>>> Iterable<Event> elements, Collector<Session> out) throws >>>>>>>>>>>>> Exception { >>>>>>>>>>>>> >>>>>>>>>>>>> ValueState<Session> session = >>>>>>>>>>>>> context.windowState().getState(sessionState); >>>>>>>>>>>>> Session s = session.value() != null ? session.value() : >>>>>>>>>>>>> new Session(); >>>>>>>>>>>>> for (Event e : elements) { >>>>>>>>>>>>> s.add(e); >>>>>>>>>>>>> } >>>>>>>>>>>>> s.lastWaterMarkedEventLite.serverTime = >>>>>>>>>>>>> context.currentWatermark(); >>>>>>>>>>>>> s.reduce(); >>>>>>>>>>>>> out.collect(s); >>>>>>>>>>>>> session.update(s); >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> @Override >>>>>>>>>>>>> public void clear(Context context){ >>>>>>>>>>>>> ValueState<Session> session = >>>>>>>>>>>>> context.windowState().getState(sessionState); >>>>>>>>>>>>> session.clear(); >>>>>>>>>>>>> } >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi < >>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hello Fabian, Thank you for the response. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think that does not work, as it is the WM of the Window >>>>>>>>>>>>>> Operator is what is desired to make deterministic decisions >>>>>>>>>>>>>> rather than off >>>>>>>>>>>>>> an operator the precedes the Window ? This is doable using >>>>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non >>>>>>>>>>>>>> mergeable >>>>>>>>>>>>>> windows. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The best API option I think is a TimeBaseTrigger that >>>>>>>>>>>>>> fires every configured time progression of WM and a Window >>>>>>>>>>>>>> implementation >>>>>>>>>>>>>> that materializes *only data up till that WM* ( it might >>>>>>>>>>>>>> have more data but that data has event time grater than the WM >>>>>>>>>>>>>> ). I am not >>>>>>>>>>>>>> sure we have that built in option and thus was asking for an >>>>>>>>>>>>>> access the >>>>>>>>>>>>>> current WM for the window operator to allow us handle the "*only >>>>>>>>>>>>>> data up till that WM" *range retrieval using some custom >>>>>>>>>>>>>> data structure. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske < >>>>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Vishal, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> the Trigger is not designed to augment records but just to >>>>>>>>>>>>>>> control when a window is evaluated. >>>>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich records >>>>>>>>>>>>>>> with the current watermark before passing them into the window >>>>>>>>>>>>>>> operator. >>>>>>>>>>>>>>> The context object of the processElement() method gives >>>>>>>>>>>>>>> access to the current watermark and timestamp of a record. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Please note that watermarks are not deterministic but may >>>>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by an >>>>>>>>>>>>>>> operator. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi < >>>>>>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> An addendum >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Is the element reference IN in onElement(IN element.. ) >>>>>>>>>>>>>>>> in Trigger<IN,..>, the same as IN the one provided to add(IN >>>>>>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any mutations >>>>>>>>>>>>>>>> to IN in the onElement() is not visible to the Accumulator >>>>>>>>>>>>>>>> that is carrying >>>>>>>>>>>>>>>> it as a previous element reference albeit in the next >>>>>>>>>>>>>>>> invocation of add(). >>>>>>>>>>>>>>>> This seems to be only in distributed mode, which makes sense >>>>>>>>>>>>>>>> only if theses >>>>>>>>>>>>>>>> reference point to different objects. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The pipeline >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .keyBy(keySelector) >>>>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap)) >>>>>>>>>>>>>>>> .trigger(new >>>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount)) >>>>>>>>>>>>>>>> .aggregate( >>>>>>>>>>>>>>>> new AggregateFunction<IN, ACC, OUT>() { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>> public ACC createAccumulator() { >>>>>>>>>>>>>>>> ACC newInstance = (ACC) accumulator.clone(); >>>>>>>>>>>>>>>> newInstance.resetLocal(); >>>>>>>>>>>>>>>> return newInstance; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>> public void add(IN value, ACC accumulator) { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> /** This method is called before onElement of >>>>>>>>>>>>>>>> the Trigger and keeps the reference to the last IN **/ >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> accumulator.add(value); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> ..... >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The Trigger >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends >>>>>>>>>>>>>>>> Serializable & >>>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger.HasWaterMark, W extends >>>>>>>>>>>>>>>> Window> extends Trigger<T, W> { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public TriggerResult onElement(T element, long timestamp, >>>>>>>>>>>>>>>> W window, TriggerContext ctx) throws Exception { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> /** The element T is mutated to carry the watermark **/ >>>>>>>>>>>>>>>> *element.setWaterMark(ctx.getCurrentWatermark());* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> . >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi < >>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I want to augment a POJO in Trigger's onElement method, >>>>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the >>>>>>>>>>>>>>>>> TriggerContext. >>>>>>>>>>>>>>>>> The sequence of execution is this sequence >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window and >>>>>>>>>>>>>>>>> save the POJO reference in the Accumulator. >>>>>>>>>>>>>>>>> 2. call to onElement on Tigger >>>>>>>>>>>>>>>>> 3. set watermark to the POJO >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The next add() method should have the last reference and >>>>>>>>>>>>>>>>> any mutation done in step 3. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> That works in a local test case, using >>>>>>>>>>>>>>>>> LocalFlinkMiniCluster, as in I have access to the mutation by >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> onElement() in the POJO in the subsequent add(), but not on >>>>>>>>>>>>>>>>> a distributed >>>>>>>>>>>>>>>>> cluster. The specific question I had is whether add() on a >>>>>>>>>>>>>>>>> supplied >>>>>>>>>>>>>>>>> accumulator on a window and onElement() method of the trigger >>>>>>>>>>>>>>>>> on that >>>>>>>>>>>>>>>>> window are inline executions, on the same thread or is there >>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>> serialization/deserialization IPC that causes these >>>>>>>>>>>>>>>>> divergence ( local >>>>>>>>>>>>>>>>> versus distributed ) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regards. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >