For example, this would have worked perfect if it did not complain about
MergeableWindow and state. The Session class in this encapsulates the  trim
up to watermark behavior ( reduce call after telling it the current WM )
we desire

public class SessionProcessWindow extends ProcessWindowFunction<Event,
Session, String, TimeWindow> {

    private static final ValueStateDescriptor<Session> sessionState =
new ValueStateDescriptor<>("session", Session.class);

    @Override
    public void process(String key, Context context, Iterable<Event>
elements, Collector<Session> out) throws Exception {

        ValueState<Session> session =
context.windowState().getState(sessionState);
        Session s = session.value() != null ? session.value() : new Session();
        for (Event e : elements) {
            s.add(e);
        }
        s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
        s.reduce();
        out.collect(s);
        session.update(s);
    }

    @Override
    public void clear(Context context){
        ValueState<Session> session =
context.windowState().getState(sessionState);
        session.clear();
    }
}




On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <vishal.santo...@gmail.com
> wrote:

> Hello Fabian, Thank you for the response.
>
>  I think that does not work, as it is the WM of the Window Operator is
> what is desired to make deterministic decisions rather than off an operator
> the precedes the Window ? This is doable using ProcessWindowFunction using
> state but only in the case of non mergeable windows.
>
>    The best API  option I think is a TimeBaseTrigger that fires every
> configured time progression of WM  and a Window implementation that
> materializes *only data up till that WM* ( it might have more data but
> that data has event time grater than the WM ). I am not sure we have that
> built in option and thus was asking for an access the current WM for the
> window operator to allow  us handle the "*only data up till that WM" *range
> retrieval using some  custom data structure.
>
> Regards.
>
>
>
>
> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> the Trigger is not designed to augment records but just to control when a
>> window is evaluated.
>> I would recommend to use a ProcessFunction to enrich records with the
>> current watermark before passing them into the window operator.
>> The context object of the processElement() method gives access to the
>> current watermark and timestamp of a record.
>>
>> Please note that watermarks are not deterministic but may depend on the
>> order in which parallel inputs are consumed by an operator.
>>
>> Best, Fabian
>>
>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>
>>> An addendum
>>>
>>> Is the element reference IN  in onElement(IN element.. ) in
>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>> Accumulator<IN,..>. It seems that any mutations to IN in the onElement() is
>>> not visible to the Accumulator that is carrying it as a previous element
>>> reference albeit in the next invocation of add(). This seems to be only in
>>> distributed mode, which makes sense only if theses reference point to
>>> different objects.
>>>
>>> The pipeline
>>>
>>> .keyBy(keySelector)
>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>> .trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>> .aggregate(
>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>
>>>             @Override
>>>             public ACC createAccumulator() {
>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>                 newInstance.resetLocal();
>>>                 return newInstance;
>>>             }
>>>
>>>             @Override
>>>             public void add(IN value, ACC accumulator) {
>>>
>>>                 /** This method is called before onElement of the Trigger 
>>> and keeps the reference to the last IN **/
>>>
>>>
>>>                 accumulator.add(value);
>>>
>>>             }
>>>             .....
>>>
>>>    The Trigger
>>>
>>>
>>> public class CountBasedWMAugmentationTrigger<T extends
>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W 
>>> extends Window> extends Trigger<T, W> {
>>>
>>>
>>>     @Override
>>>
>>>     public TriggerResult onElement(T element, long timestamp, W window, 
>>> TriggerContext ctx) throws Exception {
>>>
>>>         /** The element T is mutated to carry the watermark **/
>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>
>>>         .
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> I want to augment a POJO in  Trigger's onElement method, specifically
>>>> supply the POJO with the watermark from the TriggerContext. The sequence of
>>>> execution is this sequence
>>>>
>>>> 1. call to add() in the accumulator for the window  and save the POJO
>>>> reference in the Accumulator.
>>>> 2. call to onElement on Tigger
>>>> 3. set watermark to the POJO
>>>>
>>>> The next add() method should have the last reference and any mutation
>>>> done in step 3.
>>>>
>>>> That works in a local test case, using LocalFlinkMiniCluster, as in I
>>>> have access to the mutation by the onElement() in the POJO in the
>>>> subsequent add(),  but not on a distributed cluster. The specific question
>>>> I had is whether  add() on a supplied accumulator on a window and
>>>> onElement() method of the trigger on that window are inline executions, on
>>>> the same thread or is there any serialization/deserialization IPC that
>>>> causes these divergence ( local versus distributed )
>>>>
>>>> Regards.
>>>>
>>>
>>>
>>
>

Reply via email to