Yep, this though is suboptimal as you imagined. Two things * <IN> has a internally has a <INLite> that is a ultra lite version of IN, only required for the path analysis. * Sessionization being expensive, we piggy back multiple other aggregations that do not depend on the path or order ( count etc ) . Essentially Session is (order path + accumulated stats).
The code seems pretty all right and please tell me if you need a see it. All generics so no secrets here. On Fri, Jan 5, 2018 at 11:58 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > you would not need the ListStateDescriptor. A WindowProcessFunction stores > all events that are assigned to a window (IN objects in your case) in an > internal ListState. > The Iterable<IN> parameter of the process() method iterates over the > internal list state. > > So you would have a Trigger that fires when a new watermark is received > (or in regular intervals like every minute) and at the end of the window. > The process() method looks up the current watermark in the Context object, > traverses the Iterable<IN> filtering out all events with timestamp > > watermark (you would need to enrich the events with the timestamp which can > be done in a ProcessFunction), inserting the remaining ones into a sorted > data structure (possibly leveraging the almost sorted nature of the events) > and create a Session from it. > > This is probably less efficient than your ProcessFunction because > process() would go over the complete list over and over again and not be > able to persist the result of previous invocations. > However, the code should be easier to maintain. > > Does that make sense? > > Best, Fabian > > 2018-01-05 17:28 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > >> Hello Fabian, Thank you for your response. >> >> I thought about it and may be am missing something >> obvious here. The code below is what I think you suggest. The issue is that >> the window now is a list of Session's ( or shall subsets of the Session). >> >> What is required is that on a new watermark >> >> * We sort these Session objects >> * Get the subset that are before the new Watermark and an emit without >> purge. >> >> I do not see how the Trigger approach helps us. It does tell us that the >> watermark has progressed but to get a subset of the ListState that falls >> before the watermark, we would need access to *the new value of the >> watermark*. That was what my initial query was. >> >> >> >> public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends >> SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, >> TimeWindow> { >> >> >> OUT toCreateNew; >> Long gap; >> private final ListStateDescriptor< OUT> mergingSetsStateDescriptor; >> >> public SessionProcessWindow(TypeInformation<OUT> aggregationResultType, >> OUT toCreateNew) { >> this.toCreateNew = toCreateNew; >> mergingSetsStateDescriptor = >> new ListStateDescriptor<>("sessions", aggregationResultType); >> } >> @Override >> public void process(String s, Context context, Iterable<IN> elements, >> Collector<OUT> out) throws Exception { >> OUT session = toCreateNew.createNew(); >> elements.forEach(f -> session.add(f)); >> >> context.windowState().getListState(mergingSetsStateDescriptor).add(session); >> } >> } >> >> >> On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Vishal, >>> >>> thanks for sharing your solution! >>> >>> Looking at this issue again and your mail in which you shared your >>> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the >>> ValueState that prevents the ProcessWindowFunction to be used in a >>> mergeable window. >>> You could have created a new Session object in each invocation of the >>> ProcessWindowFucntion and simply keep the elements in the (mergable) list >>> state of the window. >>> In that case you would simply need a custom trigger that calls the >>> ProcessWindowFunction when a new watermark arrives. For intermediate calls, >>> you just FIRE and for the final call you FIRE_AND_PURGE to remove the >>> elements from the window's state. >>> Did you try that? >>> >>> Best, Fabian >>> >>> >>> >>> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>> >>>> Dear Fabian, >>>> >>>> I was able to create a pretty functional ProcessFunction and >>>> here is the synopsis and please see if it makes sense. >>>> >>>> Sessionization is unique as in it entails windows of dynamic length. >>>> The way flink approaches is pretty simple. It will create a TimeWindow of >>>> size "gap" relative to the event time, find an overlapping window ( >>>> intersection ) and create a covering window. Each such window has a "state" >>>> associated with it, which too has to be merged when a cover window is >>>> created on intersection of 2 or more incident windows.To be more >>>> precise if Window1 spans ( t1, t2 ) and a new record creates a window ( t3, >>>> t4 ) and t1<=t3<=t2 a new Window is created ( t1, t4 ) and the >>>> associated states are merged. >>>> >>>> >>>> In the current Window API the states are external and are >>>> Accumulator based. This approach pretty much works for all cases where >>>> the aggregation is accumulative/reduced and does not depend on order, >>>> as in no order list of incoming records needs to be kept and reduction is >>>> to a single aggregated element ( think counts, min max etc). In path >>>> analysis ( and other use cases ) however this approach has drawbacks. Even >>>> though in our accumulator we could keep an ordered list of events it >>>> becomes unreasonable if not within bounds. An approach that does >>>> *attempt* to bind state, is to preemptively analyze paths using the WM >>>> as the marker that defines the *subset* of the state that is safe to >>>> analyze. So if we have n events in the window state and m fall before WM, >>>> we can safely analyze the m subset, emitting paths seen and reducing the >>>> cumulative state size. There are caveats though that I will go into later. >>>> >>>> >>>> >>>> Unfortunately the Accumulators in Flink Window runtime defaults do not >>>> have access to the WM. >>>> >>>> >>>> This lead to this generic approach ( implemented and tested ) >>>> >>>> >>>> * Use a low level ProcessFunction that allows access to WM and >>>> definitely nearer to the guts of Flink. >>>> >>>> >>>> * Still use the merge Windows on intersection approach but use WM to >>>> trigger ( through Timers) reductions in state. This is not very >>>> dissimilar to what Flink does but we have more control over what to do and >>>> when to do it. Essentially have exposed a lifecycle method that reacts >>>> to WM progression. >>>> >>>> >>>> * There are essentially 2 Timers. The first timer is the maxTimeStamp() >>>> of a Window, which if there is no further mutation b'coz of merge etc will >>>> fire to reflect a Session End. The second one is on currentWaterMark+1 >>>> that essentially calls a "reduceToWM" on each keyed Window and thus State. >>>> >>>> >>>> * There are 2 ways to short circuit a Session 1. On Session time span >>>> 2. On Session size. >>>> >>>> >>>> * There is a safety valve to blacklist keys when it is obvious that it >>>> is a bot ( again >>>> >>>> >>>> The solution will thus preemptively push out Patterns ( and correct >>>> patterns ) while keeping the ordered state within reasonable bounds. The >>>> incident data of course has to be analyzed . Are the paths to large etc. >>>> But one has full control over how to fashion the solution. >>>> >>>> >>>> >>>> >>>> Regards and Thanks, >>>> >>>> >>>> Vishal >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> This makes sense. Thanks. >>>>> >>>>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> all calls to onElement() or onTimer() are syncronized for any keys. >>>>>> Think of a single thread calling these methods. >>>>>> Event-time timers are called when a watermark passes the timer. >>>>>> Watermarks are received as special records, so the methods are called in >>>>>> the same order as records (actual records or watermarks) arrive at the >>>>>> function. Only for processing-time timers, actual synchronization is >>>>>> required. >>>>>> >>>>>> The NPE might be thrown because of two timers that fire one after the >>>>>> other without a new record being processed in between the onTimer() >>>>>> calls. >>>>>> In that case the state is cleared in the first call and null in the >>>>>> second. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com >>>>>> >: >>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> I have a few follow up questions regarding ProcessFunction. I >>>>>>> think that the core should take care of any synchronization issues >>>>>>> between >>>>>>> calls to onElement and onTimer in case of a keyed stream but tests do >>>>>>> not >>>>>>> seem to suggest that. >>>>>>> >>>>>>> >>>>>>> >>>>>>> I have specifically 2 questions. >>>>>>> >>>>>>> >>>>>>> 1. Are calls to onElement(..) single threaded if scoped to a key >>>>>>> ? As in on a keyed stream, is there a way that 2 or more threads >>>>>>> can execute on the more than one element of a single key at one time ? >>>>>>> Would I have to synchronize this construction >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> *OUT accumulator = accumulatorState.value(); if (accumulator == >>>>>>> null) { accumulator = acc.createNew(); }* >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2. Can concurrent calls happen onTimer(..) and onElement(..) for >>>>>>> the same key ? I intend to clean up state but I see NullPointers >>>>>>> in OnTimer(..) thrown and I presume it is b'coz the onElement and >>>>>>> onTimer >>>>>>> are executed on 2 separate threads, with on Timer removing the >>>>>>> state ( clear() ) but after another thread has registered a Timer ( in >>>>>>> onElement ). >>>>>>> >>>>>>> >>>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // >>>>>>> NullPointers on Race Conditions* >>>>>>> accumulatorState.clear(); >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> PS. This is the full code. >>>>>>> >>>>>>> >>>>>>> >>>>>>> @Override >>>>>>> public void processElement(IN event, Context context, Collector<OUT> >>>>>>> out) throws Exception { >>>>>>> TimerService timerService = context.timerService(); >>>>>>> if (context.timestamp() > timerService.currentWatermark()) { >>>>>>> OUT accumulator = accumulatorState.value(); >>>>>>> if (accumulator == null) { >>>>>>> accumulator = acc.createNew(); >>>>>>> } >>>>>>> accumulator.setLastModified(context.timestamp()); >>>>>>> accumulatorState.update(accumulator); >>>>>>> timerService.registerEventTimeTimer(context.timestamp() + gap); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void onTimer(long timestamp, OnTimerContext context, >>>>>>> Collector<OUT> out) throws Exception { >>>>>>> OUT accumulator = accumulatorState.value(); >>>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // >>>>>>> NullPointers on Race Conditions* >>>>>>> accumulatorState.clear(); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> That's correct. Removal of timers is not supported in >>>>>>>> ProcessFunction. Not sure why this is supported for Triggers. >>>>>>>> The common workaround for ProcessFunctions is to register multiple >>>>>>>> timers and have a ValueState that stores the valid timestamp on which >>>>>>>> the >>>>>>>> onTimer method should be executed. >>>>>>>> When a timer fires and calls onTimer(), the method first checks >>>>>>>> whether the timestamp is the correct one and leaves the method if that >>>>>>>> is >>>>>>>> not the case. >>>>>>>> If you want to fire on the next watermark, another trick is to >>>>>>>> register multiple timers on (currentWatermark + 1). Since there is >>>>>>>> only one >>>>>>>> timer per timestamp, there is only one timer which gets continuously >>>>>>>> overwritten. The timer is called when the watermark is advanced. >>>>>>>> >>>>>>>> On the performance of the timer service. AFAIK, all methods that >>>>>>>> work with some kind of timer use this service. So there is not much >>>>>>>> choice. >>>>>>>> >>>>>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com>: >>>>>>>> >>>>>>>>> And that further begs the question.. how performant is Timer >>>>>>>>> Service. I tried to peruse through the architecture behind it but >>>>>>>>> cold not >>>>>>>>> find a definite clue. Is it a Scheduled Service and if yes how many >>>>>>>>> threads >>>>>>>>> etc... >>>>>>>>> >>>>>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The >>>>>>>>>> TimeService exposed by the Context does not have remove timer. Is it >>>>>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a >>>>>>>>>> PriorityQueue is expensive ? Trigger Context does expose another >>>>>>>>>> version >>>>>>>>>> that has removal abilities so was wondering why this dissonance... >>>>>>>>>> >>>>>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com >>>>>>>>>> > wrote: >>>>>>>>>> >>>>>>>>>>> Hi Vishal, >>>>>>>>>>> >>>>>>>>>>> it is not guaranteed that add() and onElement() receive the same >>>>>>>>>>> object, and even if they do it is not guaranteed that a mutation of >>>>>>>>>>> the >>>>>>>>>>> object in onElement() has an effect. The object might have been >>>>>>>>>>> serialized >>>>>>>>>>> and stored in RocksDB. >>>>>>>>>>> Hence, elements should not be modified in onElement(). >>>>>>>>>>> >>>>>>>>>>> Have you considered to implement the operation completely in a >>>>>>>>>>> ProcessFunction instead of a session window? >>>>>>>>>>> This might be more code but easier to design and reason about >>>>>>>>>>> because there is no interaction of window assigner, trigger, and >>>>>>>>>>> window >>>>>>>>>>> function. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>> >>>>>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9 >>>>>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o >>>>>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window >>>>>>>>>>>> Operator.java#L362 >>>>>>>>>>>> >>>>>>>>>>>> is where We could fashion as to what is emitted. Again for us >>>>>>>>>>>> it seems natural to use WM to materialize a micro batches with >>>>>>>>>>>> "approximate" order ( and no I am not a fan of spark micro batches >>>>>>>>>>>> :)). Any >>>>>>>>>>>> pointers as to how we could write an implementation that allows >>>>>>>>>>>> for "up >>>>>>>>>>>> till WM emission" through a trigger on a Session Window would be >>>>>>>>>>>> very >>>>>>>>>>>> helpful. In essence I believe that for any "funnel" analysis it is >>>>>>>>>>>> crucial. >>>>>>>>>>>> >>>>>>>>>>>> Something like https://github.com/apache >>>>>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s >>>>>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti >>>>>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346 >>>>>>>>>>>> >>>>>>>>>>>> I know I am simplifying this and there has to be more to it... >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi < >>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> The Trigger in this case would be some CountBased Trigger.... >>>>>>>>>>>>> Again the motive is the keep the state lean as we desire to >>>>>>>>>>>>> search for >>>>>>>>>>>>> patterns, sorted on even time, in the incoming sessionized ( and >>>>>>>>>>>>> thus of >>>>>>>>>>>>> un deterministic length ) stream.... >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi < >>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> For example, this would have worked perfect if it did not >>>>>>>>>>>>>> complain about MergeableWindow and state. The Session class in >>>>>>>>>>>>>> this >>>>>>>>>>>>>> encapsulates the trim up to watermark behavior ( reduce call >>>>>>>>>>>>>> after telling >>>>>>>>>>>>>> it the current WM ) we desire >>>>>>>>>>>>>> >>>>>>>>>>>>>> public class SessionProcessWindow extends >>>>>>>>>>>>>> ProcessWindowFunction<Event, Session, String, TimeWindow> { >>>>>>>>>>>>>> >>>>>>>>>>>>>> private static final ValueStateDescriptor<Session> >>>>>>>>>>>>>> sessionState = new ValueStateDescriptor<>("session", >>>>>>>>>>>>>> Session.class); >>>>>>>>>>>>>> >>>>>>>>>>>>>> @Override >>>>>>>>>>>>>> public void process(String key, Context context, >>>>>>>>>>>>>> Iterable<Event> elements, Collector<Session> out) throws >>>>>>>>>>>>>> Exception { >>>>>>>>>>>>>> >>>>>>>>>>>>>> ValueState<Session> session = >>>>>>>>>>>>>> context.windowState().getState(sessionState); >>>>>>>>>>>>>> Session s = session.value() != null ? session.value() : >>>>>>>>>>>>>> new Session(); >>>>>>>>>>>>>> for (Event e : elements) { >>>>>>>>>>>>>> s.add(e); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> s.lastWaterMarkedEventLite.serverTime = >>>>>>>>>>>>>> context.currentWatermark(); >>>>>>>>>>>>>> s.reduce(); >>>>>>>>>>>>>> out.collect(s); >>>>>>>>>>>>>> session.update(s); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> @Override >>>>>>>>>>>>>> public void clear(Context context){ >>>>>>>>>>>>>> ValueState<Session> session = >>>>>>>>>>>>>> context.windowState().getState(sessionState); >>>>>>>>>>>>>> session.clear(); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi < >>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello Fabian, Thank you for the response. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think that does not work, as it is the WM of the Window >>>>>>>>>>>>>>> Operator is what is desired to make deterministic decisions >>>>>>>>>>>>>>> rather than off >>>>>>>>>>>>>>> an operator the precedes the Window ? This is doable using >>>>>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non >>>>>>>>>>>>>>> mergeable >>>>>>>>>>>>>>> windows. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The best API option I think is a TimeBaseTrigger that >>>>>>>>>>>>>>> fires every configured time progression of WM and a Window >>>>>>>>>>>>>>> implementation >>>>>>>>>>>>>>> that materializes *only data up till that WM* ( it might >>>>>>>>>>>>>>> have more data but that data has event time grater than the WM >>>>>>>>>>>>>>> ). I am not >>>>>>>>>>>>>>> sure we have that built in option and thus was asking for an >>>>>>>>>>>>>>> access the >>>>>>>>>>>>>>> current WM for the window operator to allow us handle the >>>>>>>>>>>>>>> "*only >>>>>>>>>>>>>>> data up till that WM" *range retrieval using some custom >>>>>>>>>>>>>>> data structure. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske < >>>>>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Vishal, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the Trigger is not designed to augment records but just to >>>>>>>>>>>>>>>> control when a window is evaluated. >>>>>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich >>>>>>>>>>>>>>>> records with the current watermark before passing them into >>>>>>>>>>>>>>>> the window >>>>>>>>>>>>>>>> operator. >>>>>>>>>>>>>>>> The context object of the processElement() method gives >>>>>>>>>>>>>>>> access to the current watermark and timestamp of a record. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Please note that watermarks are not deterministic but may >>>>>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by >>>>>>>>>>>>>>>> an operator. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi < >>>>>>>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> An addendum >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Is the element reference IN in onElement(IN element.. ) >>>>>>>>>>>>>>>>> in Trigger<IN,..>, the same as IN the one provided to add(IN >>>>>>>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any >>>>>>>>>>>>>>>>> mutations to IN in the onElement() is not visible to the >>>>>>>>>>>>>>>>> Accumulator that >>>>>>>>>>>>>>>>> is carrying it as a previous element reference albeit in the >>>>>>>>>>>>>>>>> next >>>>>>>>>>>>>>>>> invocation of add(). This seems to be only in distributed >>>>>>>>>>>>>>>>> mode, which makes >>>>>>>>>>>>>>>>> sense only if theses reference point to different objects. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The pipeline >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> .keyBy(keySelector) >>>>>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap)) >>>>>>>>>>>>>>>>> .trigger(new >>>>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount)) >>>>>>>>>>>>>>>>> .aggregate( >>>>>>>>>>>>>>>>> new AggregateFunction<IN, ACC, OUT>() { >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>> public ACC createAccumulator() { >>>>>>>>>>>>>>>>> ACC newInstance = (ACC) accumulator.clone(); >>>>>>>>>>>>>>>>> newInstance.resetLocal(); >>>>>>>>>>>>>>>>> return newInstance; >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>> public void add(IN value, ACC accumulator) { >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> /** This method is called before onElement of >>>>>>>>>>>>>>>>> the Trigger and keeps the reference to the last IN **/ >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> accumulator.add(value); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> ..... >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The Trigger >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends >>>>>>>>>>>>>>>>> Serializable & >>>>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger.HasWaterMark, W extends >>>>>>>>>>>>>>>>> Window> extends Trigger<T, W> { >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> public TriggerResult onElement(T element, long timestamp, >>>>>>>>>>>>>>>>> W window, TriggerContext ctx) throws Exception { >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> /** The element T is mutated to carry the watermark >>>>>>>>>>>>>>>>> **/ >>>>>>>>>>>>>>>>> *element.setWaterMark(ctx.getCurrentWatermark());* >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> . >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi < >>>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I want to augment a POJO in Trigger's onElement method, >>>>>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the >>>>>>>>>>>>>>>>>> TriggerContext. >>>>>>>>>>>>>>>>>> The sequence of execution is this sequence >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window and >>>>>>>>>>>>>>>>>> save the POJO reference in the Accumulator. >>>>>>>>>>>>>>>>>> 2. call to onElement on Tigger >>>>>>>>>>>>>>>>>> 3. set watermark to the POJO >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The next add() method should have the last reference and >>>>>>>>>>>>>>>>>> any mutation done in step 3. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> That works in a local test case, using >>>>>>>>>>>>>>>>>> LocalFlinkMiniCluster, as in I have access to the mutation >>>>>>>>>>>>>>>>>> by the >>>>>>>>>>>>>>>>>> onElement() in the POJO in the subsequent add(), but not on >>>>>>>>>>>>>>>>>> a distributed >>>>>>>>>>>>>>>>>> cluster. The specific question I had is whether add() on a >>>>>>>>>>>>>>>>>> supplied >>>>>>>>>>>>>>>>>> accumulator on a window and onElement() method of the >>>>>>>>>>>>>>>>>> trigger on that >>>>>>>>>>>>>>>>>> window are inline executions, on the same thread or is there >>>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>> serialization/deserialization IPC that causes these >>>>>>>>>>>>>>>>>> divergence ( local >>>>>>>>>>>>>>>>>> versus distributed ) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Regards. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >