And that further begs the question.. how performant is Timer Service. I tried to peruse through the architecture behind it but cold not find a definite clue. Is it a Scheduled Service and if yes how many threads etc...
On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Makes sense. Did a first stab at Using ProcessFunction. The TimeService > exposed by the Context does not have remove timer. Is it primarily b'coz A > Priority Queue is the storage ad remove from a PriorityQueue is expensive > ? Trigger Context does expose another version that has removal abilities > so was wondering why this dissonance... > > On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Vishal, >> >> it is not guaranteed that add() and onElement() receive the same object, >> and even if they do it is not guaranteed that a mutation of the object in >> onElement() has an effect. The object might have been serialized and stored >> in RocksDB. >> Hence, elements should not be modified in onElement(). >> >> Have you considered to implement the operation completely in a >> ProcessFunction instead of a session window? >> This might be more code but easier to design and reason about because >> there is no interaction of window assigner, trigger, and window function. >> >> >> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >> >>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9 >>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o >>> rg/apache/flink/streaming/runtime/operators/windowing/Window >>> Operator.java#L362 >>> >>> is where We could fashion as to what is emitted. Again for us it seems >>> natural to use WM to materialize a micro batches with "approximate" order ( >>> and no I am not a fan of spark micro batches :)). Any pointers as to how we >>> could write an implementation that allows for "up till WM emission" through >>> a trigger on a Session Window would be very helpful. In essence I believe >>> that for any "funnel" analysis it is crucial. >>> >>> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98 >>> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/or >>> g/apache/flink/streaming/runtime/operators/windowing/Evictin >>> gWindowOperator.java#L346 >>> >>> I know I am simplifying this and there has to be more to it... >>> >>> >>> >>> >>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> The Trigger in this case would be some CountBased Trigger.... Again the >>>> motive is the keep the state lean as we desire to search for patterns, >>>> sorted on even time, in the incoming sessionized ( and thus of un >>>> deterministic length ) stream.... >>>> >>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> For example, this would have worked perfect if it did not complain >>>>> about MergeableWindow and state. The Session class in this encapsulates >>>>> the trim up to watermark behavior ( reduce call after telling it the >>>>> current WM ) we desire >>>>> >>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, >>>>> Session, String, TimeWindow> { >>>>> >>>>> private static final ValueStateDescriptor<Session> sessionState = new >>>>> ValueStateDescriptor<>("session", Session.class); >>>>> >>>>> @Override >>>>> public void process(String key, Context context, Iterable<Event> >>>>> elements, Collector<Session> out) throws Exception { >>>>> >>>>> ValueState<Session> session = >>>>> context.windowState().getState(sessionState); >>>>> Session s = session.value() != null ? session.value() : new >>>>> Session(); >>>>> for (Event e : elements) { >>>>> s.add(e); >>>>> } >>>>> s.lastWaterMarkedEventLite.serverTime = >>>>> context.currentWatermark(); >>>>> s.reduce(); >>>>> out.collect(s); >>>>> session.update(s); >>>>> } >>>>> >>>>> @Override >>>>> public void clear(Context context){ >>>>> ValueState<Session> session = >>>>> context.windowState().getState(sessionState); >>>>> session.clear(); >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> Hello Fabian, Thank you for the response. >>>>>> >>>>>> I think that does not work, as it is the WM of the Window Operator >>>>>> is what is desired to make deterministic decisions rather than off an >>>>>> operator the precedes the Window ? This is doable using >>>>>> ProcessWindowFunction using state but only in the case of non mergeable >>>>>> windows. >>>>>> >>>>>> The best API option I think is a TimeBaseTrigger that fires every >>>>>> configured time progression of WM and a Window implementation that >>>>>> materializes *only data up till that WM* ( it might have more data >>>>>> but that data has event time grater than the WM ). I am not sure we have >>>>>> that built in option and thus was asking for an access the current WM for >>>>>> the window operator to allow us handle the "*only data up till that >>>>>> WM" *range retrieval using some custom data structure. >>>>>> >>>>>> Regards. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Vishal, >>>>>>> >>>>>>> the Trigger is not designed to augment records but just to control >>>>>>> when a window is evaluated. >>>>>>> I would recommend to use a ProcessFunction to enrich records with >>>>>>> the current watermark before passing them into the window operator. >>>>>>> The context object of the processElement() method gives access to >>>>>>> the current watermark and timestamp of a record. >>>>>>> >>>>>>> Please note that watermarks are not deterministic but may depend on >>>>>>> the order in which parallel inputs are consumed by an operator. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com>: >>>>>>> >>>>>>>> An addendum >>>>>>>> >>>>>>>> Is the element reference IN in onElement(IN element.. ) in >>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in >>>>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the >>>>>>>> onElement() is >>>>>>>> not visible to the Accumulator that is carrying it as a previous >>>>>>>> element >>>>>>>> reference albeit in the next invocation of add(). This seems to be >>>>>>>> only in >>>>>>>> distributed mode, which makes sense only if theses reference point to >>>>>>>> different objects. >>>>>>>> >>>>>>>> The pipeline >>>>>>>> >>>>>>>> .keyBy(keySelector) >>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap)) >>>>>>>> .trigger(new >>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount)) >>>>>>>> .aggregate( >>>>>>>> new AggregateFunction<IN, ACC, OUT>() { >>>>>>>> >>>>>>>> @Override >>>>>>>> public ACC createAccumulator() { >>>>>>>> ACC newInstance = (ACC) accumulator.clone(); >>>>>>>> newInstance.resetLocal(); >>>>>>>> return newInstance; >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void add(IN value, ACC accumulator) { >>>>>>>> >>>>>>>> /** This method is called before onElement of the >>>>>>>> Trigger and keeps the reference to the last IN **/ >>>>>>>> >>>>>>>> >>>>>>>> accumulator.add(value); >>>>>>>> >>>>>>>> } >>>>>>>> ..... >>>>>>>> >>>>>>>> The Trigger >>>>>>>> >>>>>>>> >>>>>>>> public class CountBasedWMAugmentationTrigger<T extends >>>>>>>> Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W >>>>>>>> extends Window> extends Trigger<T, W> { >>>>>>>> >>>>>>>> >>>>>>>> @Override >>>>>>>> >>>>>>>> public TriggerResult onElement(T element, long timestamp, W >>>>>>>> window, TriggerContext ctx) throws Exception { >>>>>>>> >>>>>>>> /** The element T is mutated to carry the watermark **/ >>>>>>>> *element.setWaterMark(ctx.getCurrentWatermark());* >>>>>>>> >>>>>>>> . >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I want to augment a POJO in Trigger's onElement method, >>>>>>>>> specifically supply the POJO with the watermark from the >>>>>>>>> TriggerContext. >>>>>>>>> The sequence of execution is this sequence >>>>>>>>> >>>>>>>>> 1. call to add() in the accumulator for the window and save the >>>>>>>>> POJO reference in the Accumulator. >>>>>>>>> 2. call to onElement on Tigger >>>>>>>>> 3. set watermark to the POJO >>>>>>>>> >>>>>>>>> The next add() method should have the last reference and any >>>>>>>>> mutation done in step 3. >>>>>>>>> >>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as >>>>>>>>> in I have access to the mutation by the onElement() in the POJO in the >>>>>>>>> subsequent add(), but not on a distributed cluster. The specific >>>>>>>>> question >>>>>>>>> I had is whether add() on a supplied accumulator on a window and >>>>>>>>> onElement() method of the trigger on that window are inline >>>>>>>>> executions, on >>>>>>>>> the same thread or is there any serialization/deserialization IPC that >>>>>>>>> causes these divergence ( local versus distributed ) >>>>>>>>> >>>>>>>>> Regards. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >