Yep, this though is suboptimal as you imagined.   Two things

* <IN> has a internally has a <INLite> that is a ultra lite version of IN,
only required for the path analysis.
* Sessionization being expensive, we piggy back multiple other aggregations
that do not depend on the path or order ( count etc ) . Essentially Session
is (order path + accumulated stats).

The code seems pretty all right and please tell me if you need a see it.
All generics so no secrets here.








On Fri, Jan 5, 2018 at 11:58 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> you would not need the ListStateDescriptor. A WindowProcessFunction stores
> all events that are assigned to a window (IN objects in your case) in an
> internal ListState.
> The Iterable<IN> parameter of the process() method iterates over the
> internal list state.
>
> So you would have a Trigger that fires when a new watermark is received
> (or in regular intervals like every minute) and at the end of the window.
> The process() method looks up the current watermark in the Context object,
> traverses the Iterable<IN> filtering out all events with timestamp >
> watermark (you would need to enrich the events with the timestamp which can
> be done in a ProcessFunction), inserting the remaining ones into a sorted
> data structure (possibly leveraging the almost sorted nature of the events)
> and create a Session from it.
>
> This is probably less efficient than your ProcessFunction because
> process() would go over the complete list over and over again and not be
> able to persist the result of previous invocations.
> However, the code should be easier to maintain.
>
> Does that make sense?
>
> Best, Fabian
>
> 2018-01-05 17:28 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>
>> Hello Fabian, Thank you for your response.
>>
>>                      I thought about it and may be am missing something
>> obvious here. The code below is what I think you suggest. The issue is that
>> the window now is a list of Session's ( or shall subsets of the Session).
>>
>> What is required is that on a new watermark
>>
>> * We sort these Session objects
>> * Get the subset that are before the new Watermark and an emit without
>> purge.
>>
>> I do not see how the Trigger approach helps us. It does tell us that the
>> watermark has progressed but to get a subset of the ListState that falls
>> before the watermark, we would need access to *the new value  of the
>> watermark*. That was what my initial query was.
>>
>>
>>
>> public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends 
>> SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, 
>> TimeWindow> {
>>
>>
>>     OUT toCreateNew;
>>     Long gap;
>>     private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;
>>
>>     public SessionProcessWindow(TypeInformation<OUT> aggregationResultType,
>>                                 OUT toCreateNew) {
>>         this.toCreateNew = toCreateNew;
>>         mergingSetsStateDescriptor =
>>                 new ListStateDescriptor<>("sessions", aggregationResultType);
>>     }
>>     @Override
>>     public void process(String s, Context context, Iterable<IN> elements, 
>> Collector<OUT> out) throws Exception {
>>         OUT session = toCreateNew.createNew();
>>         elements.forEach(f -> session.add(f));
>>         
>> context.windowState().getListState(mergingSetsStateDescriptor).add(session);
>>     }
>> }
>>
>>
>> On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Vishal,
>>>
>>> thanks for sharing your solution!
>>>
>>> Looking at this issue again and your mail in which you shared your
>>> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
>>> ValueState that prevents the ProcessWindowFunction to be used in a
>>> mergeable window.
>>> You could have created a new Session object in each invocation of the
>>> ProcessWindowFucntion and simply keep the elements in the (mergable) list
>>> state of the window.
>>> In that case you would simply need a custom trigger that calls the
>>> ProcessWindowFunction when a new watermark arrives. For intermediate calls,
>>> you just FIRE and for the final call you FIRE_AND_PURGE to remove the
>>> elements from the window's state.
>>> Did you try that?
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>
>>>> Dear Fabian,
>>>>
>>>>            I was able to create a pretty functional ProcessFunction and
>>>> here is the synopsis and please see if it makes sense.
>>>>
>>>> Sessionization is unique as in it entails windows of dynamic length.
>>>> The way flink approaches is pretty simple. It will create a TimeWindow of
>>>> size "gap" relative to the event time, find an overlapping window (
>>>> intersection ) and create a covering window. Each such window has a "state"
>>>> associated with it, which too has to be merged when a cover window is
>>>> created on intersection of 2 or more incident windows.To be more
>>>> precise if Window1 spans ( t1, t2 ) and a new record creates a window ( t3,
>>>> t4 ) and  t1<=t3<=t2 a new Window is created ( t1, t4 ) and the
>>>> associated states are merged.
>>>>
>>>>
>>>> In the current Window API the states are external and are
>>>> Accumulator based. This approach pretty much works for all cases where
>>>> the aggregation is accumulative/reduced  and does not depend on order,
>>>> as in no order list of incoming records needs to be kept and reduction is
>>>> to a single aggregated element ( think counts, min max etc). In path
>>>> analysis ( and other use cases ) however this approach has drawbacks. Even
>>>> though in our accumulator we could keep an ordered list of events it
>>>> becomes unreasonable if not within bounds. An approach that does
>>>> *attempt* to bind state, is to preemptively analyze paths using the WM
>>>> as the marker that defines the *subset* of the state that is safe to
>>>> analyze. So if we have n events in the window state and m fall before WM,
>>>> we can safely analyze the m subset, emitting paths seen and reducing the
>>>> cumulative state size. There are caveats though that I will go into later.
>>>>
>>>>
>>>>
>>>> Unfortunately the Accumulators in Flink Window runtime defaults do not
>>>> have access to the WM.
>>>>
>>>>
>>>> This lead to this generic approach  ( implemented and tested )
>>>>
>>>>
>>>> * Use a low level ProcessFunction that allows access to WM and
>>>> definitely nearer to the guts of Flink.
>>>>
>>>>
>>>> * Still use the merge Windows on intersection approach but use WM to
>>>> trigger ( through Timers)  reductions in state. This is not very
>>>> dissimilar to what Flink does but we have more control over what to do and
>>>> when to do it. Essentially have exposed a lifecycle method that reacts
>>>> to WM progression.
>>>>
>>>>
>>>> * There are essentially 2 Timers. The first timer is the maxTimeStamp()
>>>> of a Window, which if there is no further mutation b'coz of merge etc will
>>>> fire to reflect a Session End. The second one is  on currentWaterMark+1
>>>> that essentially calls a "reduceToWM" on each keyed Window and thus State.
>>>>
>>>>
>>>> * There are 2 ways to short circuit a Session 1. On Session time span
>>>> 2. On Session size.
>>>>
>>>>
>>>> * There is a safety valve to blacklist keys when it is obvious that it
>>>> is a bot ( again
>>>>
>>>>
>>>> The solution will thus preemptively push out Patterns ( and correct
>>>> patterns ) while keeping the ordered state within reasonable bounds. The
>>>> incident data of course has to be analyzed . Are the paths to large etc.
>>>> But one has full control over how to fashion the solution.
>>>>
>>>>
>>>>
>>>>
>>>> Regards and Thanks,
>>>>
>>>>
>>>> Vishal
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> This makes sense.  Thanks.
>>>>>
>>>>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> all calls to onElement() or onTimer() are syncronized for any keys.
>>>>>> Think of a single thread calling these methods.
>>>>>> Event-time timers are called when a watermark passes the timer.
>>>>>> Watermarks are received as special records, so the methods are called in
>>>>>> the same order as records (actual records or watermarks) arrive at the
>>>>>> function. Only for processing-time timers, actual synchronization is
>>>>>> required.
>>>>>>
>>>>>> The NPE might be thrown because of two timers that fire one after the
>>>>>> other without a new record being processed in between the onTimer() 
>>>>>> calls.
>>>>>> In that case the state is cleared in the first call and null in the 
>>>>>> second.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com
>>>>>> >:
>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>     I have a few follow up questions regarding ProcessFunction. I
>>>>>>> think that the core should take care of any synchronization issues 
>>>>>>> between
>>>>>>> calls to onElement and onTimer in case of a keyed stream but tests do 
>>>>>>> not
>>>>>>> seem to suggest that.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I have  specifically 2 questions.
>>>>>>>
>>>>>>>
>>>>>>> 1.  Are calls  to onElement(..) single threaded if scoped to a key
>>>>>>> ? As in on a keyed stream, is there a  way that 2 or more threads
>>>>>>> can execute on the more than one element of a single key at one time ?
>>>>>>> Would I have to synchronize this construction
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *OUT accumulator = accumulatorState.value();        if (accumulator == 
>>>>>>> null) {            accumulator = acc.createNew();        }*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for
>>>>>>> the same key ? I intend to clean up state but I see  NullPointers
>>>>>>> in OnTimer(..) thrown and I presume it is b'coz the onElement and 
>>>>>>> onTimer
>>>>>>> are executed on 2  separate threads, with on Timer removing the
>>>>>>> state ( clear() ) but after another thread has registered a Timer ( in
>>>>>>> onElement ).
>>>>>>>
>>>>>>>
>>>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // 
>>>>>>> NullPointers on Race Conditions*
>>>>>>>         accumulatorState.clear();
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> PS. This is the full code.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> @Override
>>>>>>> public  void processElement(IN event, Context context, Collector<OUT> 
>>>>>>> out) throws Exception {
>>>>>>>     TimerService timerService = context.timerService();
>>>>>>>     if (context.timestamp() > timerService.currentWatermark()) {
>>>>>>>         OUT accumulator = accumulatorState.value();
>>>>>>>         if (accumulator == null) {
>>>>>>>             accumulator = acc.createNew();
>>>>>>>         }
>>>>>>>         accumulator.setLastModified(context.timestamp());
>>>>>>>         accumulatorState.update(accumulator);
>>>>>>>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public  void onTimer(long timestamp, OnTimerContext context, 
>>>>>>> Collector<OUT> out) throws Exception {
>>>>>>>     OUT accumulator = accumulatorState.value();
>>>>>>>     if (timestamp == accumulator.getLastModified() + gap) {* // 
>>>>>>> NullPointers on Race Conditions*
>>>>>>>         accumulatorState.clear();
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> That's correct. Removal of timers is not supported in
>>>>>>>> ProcessFunction. Not sure why this is supported for Triggers.
>>>>>>>> The common workaround for ProcessFunctions is to register multiple
>>>>>>>> timers and have a ValueState that stores the valid timestamp on which 
>>>>>>>> the
>>>>>>>> onTimer method should be executed.
>>>>>>>> When a timer fires and calls onTimer(), the method first checks
>>>>>>>> whether the timestamp is the correct one and leaves the method if that 
>>>>>>>> is
>>>>>>>> not the case.
>>>>>>>> If you want to fire on the next watermark, another trick is to
>>>>>>>> register multiple timers on (currentWatermark + 1). Since there is 
>>>>>>>> only one
>>>>>>>> timer per timestamp, there is only one timer which gets continuously
>>>>>>>> overwritten. The timer is called when the watermark is advanced.
>>>>>>>>
>>>>>>>> On the performance of the timer service. AFAIK, all methods that
>>>>>>>> work with some kind of timer use this service. So there is not much 
>>>>>>>> choice.
>>>>>>>>
>>>>>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>
>>>>>>>>> And that further begs the question.. how performant is Timer
>>>>>>>>> Service. I tried to peruse through the architecture behind it but 
>>>>>>>>> cold not
>>>>>>>>> find a definite clue. Is it a Scheduled Service and if yes how many 
>>>>>>>>> threads
>>>>>>>>> etc...
>>>>>>>>>
>>>>>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The
>>>>>>>>>> TimeService exposed by the Context does not have remove timer. Is it
>>>>>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a
>>>>>>>>>> PriorityQueue is expensive ?  Trigger Context does expose another 
>>>>>>>>>> version
>>>>>>>>>> that has removal abilities so was wondering why this dissonance...
>>>>>>>>>>
>>>>>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>
>>>>>>>>>>> it is not guaranteed that add() and onElement() receive the same
>>>>>>>>>>> object, and even if they do it is not guaranteed that a mutation of 
>>>>>>>>>>> the
>>>>>>>>>>> object in onElement() has an effect. The object might have been 
>>>>>>>>>>> serialized
>>>>>>>>>>> and stored in RocksDB.
>>>>>>>>>>> Hence, elements should not be modified in onElement().
>>>>>>>>>>>
>>>>>>>>>>> Have you considered to implement the operation completely in a
>>>>>>>>>>> ProcessFunction instead of a session window?
>>>>>>>>>>> This might be more code but easier to design and reason about
>>>>>>>>>>> because there is no interaction of window assigner, trigger, and 
>>>>>>>>>>> window
>>>>>>>>>>> function.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>>>>>>>> Operator.java#L362
>>>>>>>>>>>>
>>>>>>>>>>>> is where We could fashion as to what is emitted. Again for us
>>>>>>>>>>>> it seems natural to use WM to materialize a micro batches with
>>>>>>>>>>>> "approximate" order ( and no I am not a fan of spark micro batches 
>>>>>>>>>>>> :)). Any
>>>>>>>>>>>> pointers as to how we could write an implementation that allows 
>>>>>>>>>>>> for "up
>>>>>>>>>>>> till WM emission" through a trigger on a Session Window would be 
>>>>>>>>>>>> very
>>>>>>>>>>>> helpful. In essence I believe that for any "funnel" analysis it is 
>>>>>>>>>>>> crucial.
>>>>>>>>>>>>
>>>>>>>>>>>> Something like https://github.com/apache
>>>>>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>>>>>>>
>>>>>>>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> The Trigger in this case would be some CountBased Trigger....
>>>>>>>>>>>>> Again the motive is the keep the state lean as we desire to 
>>>>>>>>>>>>> search for
>>>>>>>>>>>>> patterns, sorted on even time,  in the incoming sessionized ( and 
>>>>>>>>>>>>> thus of
>>>>>>>>>>>>> un deterministic length ) stream....
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> For example, this would have worked perfect if it did not
>>>>>>>>>>>>>> complain about MergeableWindow and state. The Session class in 
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>> encapsulates the  trim up to watermark behavior ( reduce call 
>>>>>>>>>>>>>> after telling
>>>>>>>>>>>>>> it the current WM )  we desire
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public class SessionProcessWindow extends 
>>>>>>>>>>>>>> ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     private static final ValueStateDescriptor<Session> 
>>>>>>>>>>>>>> sessionState = new ValueStateDescriptor<>("session", 
>>>>>>>>>>>>>> Session.class);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>     public void process(String key, Context context, 
>>>>>>>>>>>>>> Iterable<Event> elements, Collector<Session> out) throws 
>>>>>>>>>>>>>> Exception {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         ValueState<Session> session = 
>>>>>>>>>>>>>> context.windowState().getState(sessionState);
>>>>>>>>>>>>>>         Session s = session.value() != null ? session.value() : 
>>>>>>>>>>>>>> new Session();
>>>>>>>>>>>>>>         for (Event e : elements) {
>>>>>>>>>>>>>>             s.add(e);
>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = 
>>>>>>>>>>>>>> context.currentWatermark();
>>>>>>>>>>>>>>         s.reduce();
>>>>>>>>>>>>>>         out.collect(s);
>>>>>>>>>>>>>>         session.update(s);
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>     public void clear(Context context){
>>>>>>>>>>>>>>         ValueState<Session> session = 
>>>>>>>>>>>>>> context.windowState().getState(sessionState);
>>>>>>>>>>>>>>         session.clear();
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>>>>>>>> Operator is what is desired to make deterministic decisions 
>>>>>>>>>>>>>>> rather than off
>>>>>>>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non 
>>>>>>>>>>>>>>> mergeable
>>>>>>>>>>>>>>> windows.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that
>>>>>>>>>>>>>>> fires every configured time progression of WM  and a Window 
>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>> that materializes *only data up till that WM* ( it might
>>>>>>>>>>>>>>> have more data but that data has event time grater than the WM 
>>>>>>>>>>>>>>> ). I am not
>>>>>>>>>>>>>>> sure we have that built in option and thus was asking for an 
>>>>>>>>>>>>>>> access the
>>>>>>>>>>>>>>> current WM for the window operator to allow  us handle the 
>>>>>>>>>>>>>>> "*only
>>>>>>>>>>>>>>> data up till that WM" *range retrieval using some  custom
>>>>>>>>>>>>>>> data structure.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <
>>>>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich
>>>>>>>>>>>>>>>> records with the current watermark before passing them into 
>>>>>>>>>>>>>>>> the window
>>>>>>>>>>>>>>>> operator.
>>>>>>>>>>>>>>>> The context object of the processElement() method gives
>>>>>>>>>>>>>>>> access to the current watermark and timestamp of a record.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Please note that watermarks are not deterministic but may
>>>>>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by 
>>>>>>>>>>>>>>>> an operator.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> An addendum
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. )
>>>>>>>>>>>>>>>>> in Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>>>>>>>>  value) in Accumulator<IN,..>. It seems that any
>>>>>>>>>>>>>>>>> mutations to IN in the onElement() is not visible to the 
>>>>>>>>>>>>>>>>> Accumulator that
>>>>>>>>>>>>>>>>> is carrying it as a previous element  reference albeit in the 
>>>>>>>>>>>>>>>>> next
>>>>>>>>>>>>>>>>> invocation of add(). This seems to be only in distributed 
>>>>>>>>>>>>>>>>> mode, which makes
>>>>>>>>>>>>>>>>> sense only if theses reference point to different objects.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The pipeline
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>>>>>>>> .trigger(new 
>>>>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>>>>>>>> .aggregate(
>>>>>>>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>                 /** This method is called before onElement of 
>>>>>>>>>>>>>>>>> the Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>>>             .....
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>    The Trigger
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>>>>>>>         Serializable & 
>>>>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger.HasWaterMark, W extends 
>>>>>>>>>>>>>>>>> Window> extends Trigger<T, W> {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, 
>>>>>>>>>>>>>>>>> W window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         /** The element T is mutated to carry the watermark 
>>>>>>>>>>>>>>>>> **/
>>>>>>>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         .
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the 
>>>>>>>>>>>>>>>>>> TriggerContext.
>>>>>>>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and
>>>>>>>>>>>>>>>>>> save the POJO  reference in the Accumulator.
>>>>>>>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The next add() method should have the last reference and
>>>>>>>>>>>>>>>>>> any mutation done in step 3.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> That works in a local test case, using
>>>>>>>>>>>>>>>>>> LocalFlinkMiniCluster, as in I have access to the mutation 
>>>>>>>>>>>>>>>>>> by the
>>>>>>>>>>>>>>>>>> onElement() in the POJO in the subsequent add(),  but not on 
>>>>>>>>>>>>>>>>>> a distributed
>>>>>>>>>>>>>>>>>> cluster. The specific question I had is whether  add() on a 
>>>>>>>>>>>>>>>>>> supplied
>>>>>>>>>>>>>>>>>> accumulator on a window and onElement() method of the 
>>>>>>>>>>>>>>>>>> trigger on that
>>>>>>>>>>>>>>>>>> window are inline executions, on the same thread or is there 
>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>> serialization/deserialization IPC that causes these 
>>>>>>>>>>>>>>>>>> divergence ( local
>>>>>>>>>>>>>>>>>> versus distributed )
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to