That's correct. Removal of timers is not supported in ProcessFunction. Not
sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers
and have a ValueState that stores the valid timestamp on which the onTimer
method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the
timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register
multiple timers on (currentWatermark + 1). Since there is only one timer
per timestamp, there is only one timer which gets continuously overwritten.
The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with
some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:

> And that further begs the question.. how performant is Timer Service. I
> tried to peruse through the architecture behind it but cold not find a
> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>
> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Makes sense. Did a first stab at Using ProcessFunction. The TimeService
>> exposed by the Context does not have remove timer. Is it primarily b'coz A
>> Priority Queue is the storage ad remove from a PriorityQueue is expensive
>> ?  Trigger Context does expose another version that has removal abilities
>> so was wondering why this dissonance...
>>
>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Vishal,
>>>
>>> it is not guaranteed that add() and onElement() receive the same object,
>>> and even if they do it is not guaranteed that a mutation of the object in
>>> onElement() has an effect. The object might have been serialized and stored
>>> in RocksDB.
>>> Hence, elements should not be modified in onElement().
>>>
>>> Have you considered to implement the operation completely in a
>>> ProcessFunction instead of a session window?
>>> This might be more code but easier to design and reason about because
>>> there is no interaction of window assigner, trigger, and window function.
>>>
>>>
>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>
>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>> Operator.java#L362
>>>>
>>>> is where We could fashion as to what is emitted. Again for us it seems
>>>> natural to use WM to materialize a micro batches with "approximate" order (
>>>> and no I am not a fan of spark micro batches :)). Any pointers as to how we
>>>> could write an implementation that allows for "up till WM emission" through
>>>> a trigger on a Session Window would be very helpful. In essence I believe
>>>> that for any "funnel" analysis it is crucial.
>>>>
>>>> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98
>>>> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/or
>>>> g/apache/flink/streaming/runtime/operators/windowing/Evictin
>>>> gWindowOperator.java#L346
>>>>
>>>> I know I am simplifying this and there has to be more to it...
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> The Trigger in this case would be some CountBased Trigger.... Again
>>>>> the motive is the keep the state lean as we desire to search for  
>>>>> patterns,
>>>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>>>> deterministic length ) stream....
>>>>>
>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> For example, this would have worked perfect if it did not complain
>>>>>> about MergeableWindow and state. The Session class in this encapsulates
>>>>>> the  trim up to watermark behavior ( reduce call after telling it the
>>>>>> current WM )  we desire
>>>>>>
>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, 
>>>>>> Session, String, TimeWindow> {
>>>>>>
>>>>>>     private static final ValueStateDescriptor<Session> sessionState = 
>>>>>> new ValueStateDescriptor<>("session", Session.class);
>>>>>>
>>>>>>     @Override
>>>>>>     public void process(String key, Context context, Iterable<Event> 
>>>>>> elements, Collector<Session> out) throws Exception {
>>>>>>
>>>>>>         ValueState<Session> session = 
>>>>>> context.windowState().getState(sessionState);
>>>>>>         Session s = session.value() != null ? session.value() : new 
>>>>>> Session();
>>>>>>         for (Event e : elements) {
>>>>>>             s.add(e);
>>>>>>         }
>>>>>>         s.lastWaterMarkedEventLite.serverTime = 
>>>>>> context.currentWatermark();
>>>>>>         s.reduce();
>>>>>>         out.collect(s);
>>>>>>         session.update(s);
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public void clear(Context context){
>>>>>>         ValueState<Session> session = 
>>>>>> context.windowState().getState(sessionState);
>>>>>>         session.clear();
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>
>>>>>>>  I think that does not work, as it is the WM of the Window Operator
>>>>>>> is what is desired to make deterministic decisions rather than off an
>>>>>>> operator the precedes the Window ? This is doable using
>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>>> windows.
>>>>>>>
>>>>>>>    The best API  option I think is a TimeBaseTrigger that fires
>>>>>>> every configured time progression of WM  and a Window implementation 
>>>>>>> that
>>>>>>> materializes *only data up till that WM* ( it might have more data
>>>>>>> but that data has event time grater than the WM ). I am not sure we have
>>>>>>> that built in option and thus was asking for an access the current WM 
>>>>>>> for
>>>>>>> the window operator to allow  us handle the "*only data up till
>>>>>>> that WM" *range retrieval using some  custom data structure.
>>>>>>>
>>>>>>> Regards.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Vishal,
>>>>>>>>
>>>>>>>> the Trigger is not designed to augment records but just to control
>>>>>>>> when a window is evaluated.
>>>>>>>> I would recommend to use a ProcessFunction to enrich records with
>>>>>>>> the current watermark before passing them into the window operator.
>>>>>>>> The context object of the processElement() method gives access to
>>>>>>>> the current watermark and timestamp of a record.
>>>>>>>>
>>>>>>>> Please note that watermarks are not deterministic but may depend on
>>>>>>>> the order in which parallel inputs are consumed by an operator.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>
>>>>>>>>> An addendum
>>>>>>>>>
>>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>>>>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the 
>>>>>>>>> onElement() is
>>>>>>>>> not visible to the Accumulator that is carrying it as a previous 
>>>>>>>>> element
>>>>>>>>> reference albeit in the next invocation of add(). This seems to be 
>>>>>>>>> only in
>>>>>>>>> distributed mode, which makes sense only if theses reference point to
>>>>>>>>> different objects.
>>>>>>>>>
>>>>>>>>> The pipeline
>>>>>>>>>
>>>>>>>>> .keyBy(keySelector)
>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>> .trigger(new 
>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>> .aggregate(
>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>
>>>>>>>>>             @Override
>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>                 return newInstance;
>>>>>>>>>             }
>>>>>>>>>
>>>>>>>>>             @Override
>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>
>>>>>>>>>                 /** This method is called before onElement of the 
>>>>>>>>> Trigger and keeps the reference to the last IN **/
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>
>>>>>>>>>             }
>>>>>>>>>             .....
>>>>>>>>>
>>>>>>>>>    The Trigger
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, 
>>>>>>>>> W extends Window> extends Trigger<T, W> {
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>
>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W 
>>>>>>>>> window, TriggerContext ctx) throws Exception {
>>>>>>>>>
>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>
>>>>>>>>>         .
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>> specifically supply the POJO with the watermark from the 
>>>>>>>>>> TriggerContext.
>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>
>>>>>>>>>> 1. call to add() in the accumulator for the window  and save the
>>>>>>>>>> POJO  reference in the Accumulator.
>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>
>>>>>>>>>> The next add() method should have the last reference and any
>>>>>>>>>> mutation done in step 3.
>>>>>>>>>>
>>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as
>>>>>>>>>> in I have access to the mutation by the onElement() in the POJO in 
>>>>>>>>>> the
>>>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific 
>>>>>>>>>> question
>>>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>>>> onElement() method of the trigger on that window are inline 
>>>>>>>>>> executions, on
>>>>>>>>>> the same thread or is there any serialization/deserialization IPC 
>>>>>>>>>> that
>>>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>>>
>>>>>>>>>> Regards.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to