Hi,

all calls to onElement() or onTimer() are syncronized for any keys. Think
of a single thread calling these methods.
Event-time timers are called when a watermark passes the timer. Watermarks
are received as special records, so the methods are called in the same
order as records (actual records or watermarks) arrive at the function.
Only for processing-time timers, actual synchronization is required.

The NPE might be thrown because of two timers that fire one after the other
without a new record being processed in between the onTimer() calls. In
that case the state is cleared in the first call and null in the second.

Best, Fabian

2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:

> Thanks.
>
>
>
>
>     I have a few follow up questions regarding ProcessFunction. I think
> that the core should take care of any synchronization issues between calls
> to onElement and onTimer in case of a keyed stream but tests do not seem to
> suggest that.
>
>
>
> I have  specifically 2 questions.
>
>
> 1.  Are calls  to onElement(..) single threaded if scoped to a key ? As
> in on a keyed stream, is there a  way that 2 or more threads can execute
> on the more than one element of a single key at one time ? Would I have to
> synchronize this construction
>
>
>
>
> *OUT accumulator = accumulatorState.value();        if (accumulator == null) 
> {            accumulator = acc.createNew();        }*
>
>
>
> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for the
> same key ? I intend to clean up state but I see  NullPointers in
> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are
> executed on 2  separate threads, with on Timer removing the state (
> clear() ) but after another thread has registered a Timer ( in onElement ).
>
>
> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on 
> Race Conditions*
>         accumulatorState.clear();
>     }
>
>
>
>
>
>
> PS. This is the full code.
>
>
>
> @Override
> public  void processElement(IN event, Context context, Collector<OUT> out) 
> throws Exception {
>     TimerService timerService = context.timerService();
>     if (context.timestamp() > timerService.currentWatermark()) {
>         OUT accumulator = accumulatorState.value();
>         if (accumulator == null) {
>             accumulator = acc.createNew();
>         }
>         accumulator.setLastModified(context.timestamp());
>         accumulatorState.update(accumulator);
>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>     }
> }
>
> @Override
> public  void onTimer(long timestamp, OnTimerContext context, Collector<OUT> 
> out) throws Exception {
>     OUT accumulator = accumulatorState.value();
>     if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers 
> on Race Conditions*
>         accumulatorState.clear();
>     }
> }
>
>
>
> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> That's correct. Removal of timers is not supported in ProcessFunction.
>> Not sure why this is supported for Triggers.
>> The common workaround for ProcessFunctions is to register multiple timers
>> and have a ValueState that stores the valid timestamp on which the onTimer
>> method should be executed.
>> When a timer fires and calls onTimer(), the method first checks whether
>> the timestamp is the correct one and leaves the method if that is not the
>> case.
>> If you want to fire on the next watermark, another trick is to register
>> multiple timers on (currentWatermark + 1). Since there is only one timer
>> per timestamp, there is only one timer which gets continuously overwritten.
>> The timer is called when the watermark is advanced.
>>
>> On the performance of the timer service. AFAIK, all methods that work
>> with some kind of timer use this service. So there is not much choice.
>>
>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>
>>> And that further begs the question.. how performant is Timer Service. I
>>> tried to peruse through the architecture behind it but cold not find a
>>> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>>>
>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> Makes sense. Did a first stab at Using ProcessFunction. The TimeService
>>>> exposed by the Context does not have remove timer. Is it primarily b'coz A
>>>> Priority Queue is the storage ad remove from a PriorityQueue is expensive
>>>> ?  Trigger Context does expose another version that has removal abilities
>>>> so was wondering why this dissonance...
>>>>
>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> it is not guaranteed that add() and onElement() receive the same
>>>>> object, and even if they do it is not guaranteed that a mutation of the
>>>>> object in onElement() has an effect. The object might have been serialized
>>>>> and stored in RocksDB.
>>>>> Hence, elements should not be modified in onElement().
>>>>>
>>>>> Have you considered to implement the operation completely in a
>>>>> ProcessFunction instead of a session window?
>>>>> This might be more code but easier to design and reason about because
>>>>> there is no interaction of window assigner, trigger, and window function.
>>>>>
>>>>>
>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>
>>>>> :
>>>>>
>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>> Operator.java#L362
>>>>>>
>>>>>> is where We could fashion as to what is emitted. Again for us it
>>>>>> seems natural to use WM to materialize a micro batches with "approximate"
>>>>>> order ( and no I am not a fan of spark micro batches :)). Any pointers as
>>>>>> to how we could write an implementation that allows for "up till WM
>>>>>> emission" through a trigger on a Session Window would be very helpful. In
>>>>>> essence I believe that for any "funnel" analysis it is crucial.
>>>>>>
>>>>>> Something like https://github.com/apache
>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>
>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>
>>>>>>> The Trigger in this case would be some CountBased Trigger.... Again
>>>>>>> the motive is the keep the state lean as we desire to search for  
>>>>>>> patterns,
>>>>>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>>>>>> deterministic length ) stream....
>>>>>>>
>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> For example, this would have worked perfect if it did not complain
>>>>>>>> about MergeableWindow and state. The Session class in this encapsulates
>>>>>>>> the  trim up to watermark behavior ( reduce call after telling it the
>>>>>>>> current WM )  we desire
>>>>>>>>
>>>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, 
>>>>>>>> Session, String, TimeWindow> {
>>>>>>>>
>>>>>>>>     private static final ValueStateDescriptor<Session> sessionState = 
>>>>>>>> new ValueStateDescriptor<>("session", Session.class);
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void process(String key, Context context, Iterable<Event> 
>>>>>>>> elements, Collector<Session> out) throws Exception {
>>>>>>>>
>>>>>>>>         ValueState<Session> session = 
>>>>>>>> context.windowState().getState(sessionState);
>>>>>>>>         Session s = session.value() != null ? session.value() : new 
>>>>>>>> Session();
>>>>>>>>         for (Event e : elements) {
>>>>>>>>             s.add(e);
>>>>>>>>         }
>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = 
>>>>>>>> context.currentWatermark();
>>>>>>>>         s.reduce();
>>>>>>>>         out.collect(s);
>>>>>>>>         session.update(s);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void clear(Context context){
>>>>>>>>         ValueState<Session> session = 
>>>>>>>> context.windowState().getState(sessionState);
>>>>>>>>         session.clear();
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>
>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>> Operator is what is desired to make deterministic decisions rather 
>>>>>>>>> than off
>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>> ProcessWindowFunction using state but only in the case of non 
>>>>>>>>> mergeable
>>>>>>>>> windows.
>>>>>>>>>
>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that fires
>>>>>>>>> every configured time progression of WM  and a Window implementation 
>>>>>>>>> that
>>>>>>>>> materializes *only data up till that WM* ( it might have more
>>>>>>>>> data but that data has event time grater than the WM ). I am not sure 
>>>>>>>>> we
>>>>>>>>> have that built in option and thus was asking for an access the 
>>>>>>>>> current WM
>>>>>>>>> for the window operator to allow  us handle the "*only data up
>>>>>>>>> till that WM" *range retrieval using some  custom data structure.
>>>>>>>>>
>>>>>>>>> Regards.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Vishal,
>>>>>>>>>>
>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>> I would recommend to use a ProcessFunction to enrich records with
>>>>>>>>>> the current watermark before passing them into the window operator.
>>>>>>>>>> The context object of the processElement() method gives access to
>>>>>>>>>> the current watermark and timestamp of a record.
>>>>>>>>>>
>>>>>>>>>> Please note that watermarks are not deterministic but may depend
>>>>>>>>>> on the order in which parallel inputs are consumed by an operator.
>>>>>>>>>>
>>>>>>>>>> Best, Fabian
>>>>>>>>>>
>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> An addendum
>>>>>>>>>>>
>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any mutations to IN
>>>>>>>>>>> in the onElement() is not visible to the Accumulator that is 
>>>>>>>>>>> carrying it as
>>>>>>>>>>> a previous element  reference albeit in the next invocation of 
>>>>>>>>>>> add(). This
>>>>>>>>>>> seems to be only in distributed mode, which makes sense only if 
>>>>>>>>>>> theses
>>>>>>>>>>> reference point to different objects.
>>>>>>>>>>>
>>>>>>>>>>> The pipeline
>>>>>>>>>>>
>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>> .trigger(new 
>>>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>> .aggregate(
>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>
>>>>>>>>>>>             @Override
>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>             }
>>>>>>>>>>>
>>>>>>>>>>>             @Override
>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>
>>>>>>>>>>>                 /** This method is called before onElement of the 
>>>>>>>>>>> Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>
>>>>>>>>>>>             }
>>>>>>>>>>>             .....
>>>>>>>>>>>
>>>>>>>>>>>    The Trigger
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>         Serializable & 
>>>>>>>>>>> CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> 
>>>>>>>>>>> extends Trigger<T, W> {
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>     @Override
>>>>>>>>>>>
>>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W 
>>>>>>>>>>> window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>
>>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>
>>>>>>>>>>>         .
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>> specifically supply the POJO with the watermark from the 
>>>>>>>>>>>> TriggerContext.
>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>
>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and save
>>>>>>>>>>>> the POJO  reference in the Accumulator.
>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>
>>>>>>>>>>>> The next add() method should have the last reference and any
>>>>>>>>>>>> mutation done in step 3.
>>>>>>>>>>>>
>>>>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster,
>>>>>>>>>>>> as in I have access to the mutation by the onElement() in the POJO 
>>>>>>>>>>>> in the
>>>>>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific 
>>>>>>>>>>>> question
>>>>>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>>>>>> onElement() method of the trigger on that window are inline 
>>>>>>>>>>>> executions, on
>>>>>>>>>>>> the same thread or is there any serialization/deserialization IPC 
>>>>>>>>>>>> that
>>>>>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>>>>>
>>>>>>>>>>>> Regards.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to