I guess
https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems
natural to use WM to materialize a micro batches with "approximate" order (
and no I am not a fan of spark micro batches :)). Any pointers as to how we
could write an implementation that allows for "up till WM emission" through
a trigger on a Session Window would be very helpful. In essence I believe
that for any "funnel" analysis it is crucial.

Something like
https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java#L346

I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <vishal.santo...@gmail.com
> wrote:

> The Trigger in this case would be some CountBased Trigger.... Again the
> motive is the keep the state lean as we desire to search for  patterns,
> sorted on even time,  in the incoming sessionized ( and thus of un
> deterministic length ) stream....
>
> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> For example, this would have worked perfect if it did not complain about
>> MergeableWindow and state. The Session class in this encapsulates the  trim
>> up to watermark behavior ( reduce call after telling it the current WM )
>> we desire
>>
>> public class SessionProcessWindow extends ProcessWindowFunction<Event, 
>> Session, String, TimeWindow> {
>>
>>     private static final ValueStateDescriptor<Session> sessionState = new 
>> ValueStateDescriptor<>("session", Session.class);
>>
>>     @Override
>>     public void process(String key, Context context, Iterable<Event> 
>> elements, Collector<Session> out) throws Exception {
>>
>>         ValueState<Session> session = 
>> context.windowState().getState(sessionState);
>>         Session s = session.value() != null ? session.value() : new 
>> Session();
>>         for (Event e : elements) {
>>             s.add(e);
>>         }
>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>         s.reduce();
>>         out.collect(s);
>>         session.update(s);
>>     }
>>
>>     @Override
>>     public void clear(Context context){
>>         ValueState<Session> session = 
>> context.windowState().getState(sessionState);
>>         session.clear();
>>     }
>> }
>>
>>
>>
>>
>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hello Fabian, Thank you for the response.
>>>
>>>  I think that does not work, as it is the WM of the Window Operator is
>>> what is desired to make deterministic decisions rather than off an operator
>>> the precedes the Window ? This is doable using ProcessWindowFunction using
>>> state but only in the case of non mergeable windows.
>>>
>>>    The best API  option I think is a TimeBaseTrigger that fires every
>>> configured time progression of WM  and a Window implementation that
>>> materializes *only data up till that WM* ( it might have more data but
>>> that data has event time grater than the WM ). I am not sure we have that
>>> built in option and thus was asking for an access the current WM for the
>>> window operator to allow  us handle the "*only data up till that WM" *range
>>> retrieval using some  custom data structure.
>>>
>>> Regards.
>>>
>>>
>>>
>>>
>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Vishal,
>>>>
>>>> the Trigger is not designed to augment records but just to control when
>>>> a window is evaluated.
>>>> I would recommend to use a ProcessFunction to enrich records with the
>>>> current watermark before passing them into the window operator.
>>>> The context object of the processElement() method gives access to the
>>>> current watermark and timestamp of a record.
>>>>
>>>> Please note that watermarks are not deterministic but may depend on the
>>>> order in which parallel inputs are consumed by an operator.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>>
>>>>> An addendum
>>>>>
>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() 
>>>>> is
>>>>> not visible to the Accumulator that is carrying it as a previous element
>>>>> reference albeit in the next invocation of add(). This seems to be only in
>>>>> distributed mode, which makes sense only if theses reference point to
>>>>> different objects.
>>>>>
>>>>> The pipeline
>>>>>
>>>>> .keyBy(keySelector)
>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>> .aggregate(
>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>
>>>>>             @Override
>>>>>             public ACC createAccumulator() {
>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>                 newInstance.resetLocal();
>>>>>                 return newInstance;
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public void add(IN value, ACC accumulator) {
>>>>>
>>>>>                 /** This method is called before onElement of the Trigger 
>>>>> and keeps the reference to the last IN **/
>>>>>
>>>>>
>>>>>                 accumulator.add(value);
>>>>>
>>>>>             }
>>>>>             .....
>>>>>
>>>>>    The Trigger
>>>>>
>>>>>
>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W 
>>>>> extends Window> extends Trigger<T, W> {
>>>>>
>>>>>
>>>>>     @Override
>>>>>
>>>>>     public TriggerResult onElement(T element, long timestamp, W window, 
>>>>> TriggerContext ctx) throws Exception {
>>>>>
>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>
>>>>>         .
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> I want to augment a POJO in  Trigger's onElement method, specifically
>>>>>> supply the POJO with the watermark from the TriggerContext. The sequence 
>>>>>> of
>>>>>> execution is this sequence
>>>>>>
>>>>>> 1. call to add() in the accumulator for the window  and save the
>>>>>> POJO  reference in the Accumulator.
>>>>>> 2. call to onElement on Tigger
>>>>>> 3. set watermark to the POJO
>>>>>>
>>>>>> The next add() method should have the last reference and any mutation
>>>>>> done in step 3.
>>>>>>
>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as in I
>>>>>> have access to the mutation by the onElement() in the POJO in the
>>>>>> subsequent add(),  but not on a distributed cluster. The specific 
>>>>>> question
>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>> onElement() method of the trigger on that window are inline executions, 
>>>>>> on
>>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>>> causes these divergence ( local versus distributed )
>>>>>>
>>>>>> Regards.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to