And that further begs the question.. how performant is Timer Service. I
tried to peruse through the architecture behind it but cold not find a
definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Makes sense. Did a first stab at Using ProcessFunction. The TimeService
> exposed by the Context does not have remove timer. Is it primarily b'coz A
> Priority Queue is the storage ad remove from a PriorityQueue is expensive
> ?  Trigger Context does expose another version that has removal abilities
> so was wondering why this dissonance...
>
> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> it is not guaranteed that add() and onElement() receive the same object,
>> and even if they do it is not guaranteed that a mutation of the object in
>> onElement() has an effect. The object might have been serialized and stored
>> in RocksDB.
>> Hence, elements should not be modified in onElement().
>>
>> Have you considered to implement the operation completely in a
>> ProcessFunction instead of a session window?
>> This might be more code but easier to design and reason about because
>> there is no interaction of window assigner, trigger, and window function.
>>
>>
>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>
>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>> Operator.java#L362
>>>
>>> is where We could fashion as to what is emitted. Again for us it seems
>>> natural to use WM to materialize a micro batches with "approximate" order (
>>> and no I am not a fan of spark micro batches :)). Any pointers as to how we
>>> could write an implementation that allows for "up till WM emission" through
>>> a trigger on a Session Window would be very helpful. In essence I believe
>>> that for any "funnel" analysis it is crucial.
>>>
>>> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98
>>> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/or
>>> g/apache/flink/streaming/runtime/operators/windowing/Evictin
>>> gWindowOperator.java#L346
>>>
>>> I know I am simplifying this and there has to be more to it...
>>>
>>>
>>>
>>>
>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> The Trigger in this case would be some CountBased Trigger.... Again the
>>>> motive is the keep the state lean as we desire to search for  patterns,
>>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>>> deterministic length ) stream....
>>>>
>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> For example, this would have worked perfect if it did not complain
>>>>> about MergeableWindow and state. The Session class in this encapsulates
>>>>> the  trim up to watermark behavior ( reduce call after telling it the
>>>>> current WM )  we desire
>>>>>
>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, 
>>>>> Session, String, TimeWindow> {
>>>>>
>>>>>     private static final ValueStateDescriptor<Session> sessionState = new 
>>>>> ValueStateDescriptor<>("session", Session.class);
>>>>>
>>>>>     @Override
>>>>>     public void process(String key, Context context, Iterable<Event> 
>>>>> elements, Collector<Session> out) throws Exception {
>>>>>
>>>>>         ValueState<Session> session = 
>>>>> context.windowState().getState(sessionState);
>>>>>         Session s = session.value() != null ? session.value() : new 
>>>>> Session();
>>>>>         for (Event e : elements) {
>>>>>             s.add(e);
>>>>>         }
>>>>>         s.lastWaterMarkedEventLite.serverTime = 
>>>>> context.currentWatermark();
>>>>>         s.reduce();
>>>>>         out.collect(s);
>>>>>         session.update(s);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void clear(Context context){
>>>>>         ValueState<Session> session = 
>>>>> context.windowState().getState(sessionState);
>>>>>         session.clear();
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> Hello Fabian, Thank you for the response.
>>>>>>
>>>>>>  I think that does not work, as it is the WM of the Window Operator
>>>>>> is what is desired to make deterministic decisions rather than off an
>>>>>> operator the precedes the Window ? This is doable using
>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>> windows.
>>>>>>
>>>>>>    The best API  option I think is a TimeBaseTrigger that fires every
>>>>>> configured time progression of WM  and a Window implementation that
>>>>>> materializes *only data up till that WM* ( it might have more data
>>>>>> but that data has event time grater than the WM ). I am not sure we have
>>>>>> that built in option and thus was asking for an access the current WM for
>>>>>> the window operator to allow  us handle the "*only data up till that
>>>>>> WM" *range retrieval using some  custom data structure.
>>>>>>
>>>>>> Regards.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Vishal,
>>>>>>>
>>>>>>> the Trigger is not designed to augment records but just to control
>>>>>>> when a window is evaluated.
>>>>>>> I would recommend to use a ProcessFunction to enrich records with
>>>>>>> the current watermark before passing them into the window operator.
>>>>>>> The context object of the processElement() method gives access to
>>>>>>> the current watermark and timestamp of a record.
>>>>>>>
>>>>>>> Please note that watermarks are not deterministic but may depend on
>>>>>>> the order in which parallel inputs are consumed by an operator.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>
>>>>>>>> An addendum
>>>>>>>>
>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>>>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the 
>>>>>>>> onElement() is
>>>>>>>> not visible to the Accumulator that is carrying it as a previous 
>>>>>>>> element
>>>>>>>> reference albeit in the next invocation of add(). This seems to be 
>>>>>>>> only in
>>>>>>>> distributed mode, which makes sense only if theses reference point to
>>>>>>>> different objects.
>>>>>>>>
>>>>>>>> The pipeline
>>>>>>>>
>>>>>>>> .keyBy(keySelector)
>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>> .trigger(new 
>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>> .aggregate(
>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>
>>>>>>>>             @Override
>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>                 return newInstance;
>>>>>>>>             }
>>>>>>>>
>>>>>>>>             @Override
>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>
>>>>>>>>                 /** This method is called before onElement of the 
>>>>>>>> Trigger and keeps the reference to the last IN **/
>>>>>>>>
>>>>>>>>
>>>>>>>>                 accumulator.add(value);
>>>>>>>>
>>>>>>>>             }
>>>>>>>>             .....
>>>>>>>>
>>>>>>>>    The Trigger
>>>>>>>>
>>>>>>>>
>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W 
>>>>>>>> extends Window> extends Trigger<T, W> {
>>>>>>>>
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>
>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W 
>>>>>>>> window, TriggerContext ctx) throws Exception {
>>>>>>>>
>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>
>>>>>>>>         .
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>> specifically supply the POJO with the watermark from the 
>>>>>>>>> TriggerContext.
>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>
>>>>>>>>> 1. call to add() in the accumulator for the window  and save the
>>>>>>>>> POJO  reference in the Accumulator.
>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>
>>>>>>>>> The next add() method should have the last reference and any
>>>>>>>>> mutation done in step 3.
>>>>>>>>>
>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as
>>>>>>>>> in I have access to the mutation by the onElement() in the POJO in the
>>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific 
>>>>>>>>> question
>>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>>> onElement() method of the trigger on that window are inline 
>>>>>>>>> executions, on
>>>>>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>>
>>>>>>>>> Regards.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to