And also I think that the shouldFire has to take as an additional argument the time. This will help differentiate between ON_TIME and EARLY, LATE firings.
> On Jul 26, 2016, at 11:02 AM, Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > > Hello, > > This is a nice proposal that I think covers most of the cases. > The only thing that is missing would be a method: > > void onFire(Window window, TriggerContext ctx) > > that will be responsible for doing whatever is necessary if the > windowOperator decides to fire. You can imagine resetting > the counter of a countTrigger to 0. > > As a recap, the SimpleTrigger interface should be: > > class SimpleTrigger { > void onElement(T element, long timestamp, W window, TriggerContext ctx); > boolean shouldFire(W window, TriggerContext cox); > > void onMerge(W window, OnMergeContext cox); > void onFire(Window window, TriggerContext ctx) > void clear(W window, TriggerContext ctx); > } > > The onMerge and onFire methods can be seen as callbacks > and will be applied upon merge (in case of Session windows) and > upon firing. > > What do you think? > > Kostas > >> On Jul 25, 2016, at 3:34 PM, Aljoscha Krettek <aljos...@apache.org> wrote: >> >> Hi, >> yes, this is essentially the solution I had in my head but I went a bit >> further and generalized it. >> >> Basically, to make triggers composable they should have this interface, >> let's call it SimpleTrigger for now: >> >> class SimpleTrigger { >> void onElement(T element, long timestamp, W window, TriggerContext ctx); >> boolean shouldFire(W window, TriggerContext ctx); >> void onMerge(W window, OnMergeContext ctx); >> void clear(W window, TriggerContext ctx); >> } >> >> notice how onElement() cannot return a TriggerResult anymore and how >> onEventTime() and onProcessingTime() of the currently existing Trigger >> interface were folded into shouldFire(). Each trigger essentially becomes a >> predicate that says at any given time whether they would fire the window. >> Having just one method that can decide whether to fire or not makes these >> easily composable to form complex triggers, thus enabling the trigger DSL >> we want to implement. >> >> The way to go about implementing this is either to replace our current >> Trigger interface by this new interface or to keep our more powerful >> interface with all the customization options and have one >> SimpleTriggerTrigger that can execute a tree of SimpleTriggers. A rough >> sketch of this would be this: >> https://gist.github.com/aljoscha/66b0fcab89cd2b6190a63899f461067f >> >> Cheers, >> Aljoscha >> >> >> >> On Mon, 25 Jul 2016 at 14:33 Kostas Kloudas <k.klou...@data-artisans.com> >> wrote: >> >>> Hi Aljoscha, >>> >>> This was exactly one of the problems I also found. >>> >>> The way I was thinking about it is the following: >>> >>> Conceptually, time (event and processing) advances but state is a >>> fixed property of the window. >>> >>> Given this, I modified the Count trigger to also ask for the >>> current state (count) of the window in all method (e.g. onEventTime and >>> onProcessingTime). This way the trigger can be composed and play >>> well with the other triggers. >>> >>> If you have any more ideas on that and the rest of the problems I >>> sent in the previous email, please let me know. >>> >>> Kostas >>> >>>> On Jul 25, 2016, at 2:22 PM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>>> >>>> These are some very interesting thoughts! I have some more, based on >>> these: >>>> >>>> What happens if you have for example this Trigger: >>>> All(Watermark.pastEndOfWindow(), Count.atLeast(10)) >>>> >>>> When would this even fire, i.e. what are the steps that lead to this >>>> combined trigger firing with the Trigger system that we currently have in >>>> place? >>>> >>>> I have some thoughts but they are not compatible with the way we >>> currently >>>> handle triggers. I have to think some more, but please shoot if you have >>>> any ideas. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Fri, 22 Jul 2016 at 13:10 Kostas Kloudas <k.klou...@data-artisans.com >>>> >>>> wrote: >>>> >>>>> Forgot to say that the signature for the onFire() that I think fits >>> should >>>>> be: >>>>> >>>>> void onFire(Window window, TriggerContext ctx) throws Exception; >>>>> >>>>>> On Jul 22, 2016, at 12:47 PM, Kostas Kloudas < >>>>> k.klou...@data-artisans.com> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I started working on the new triggers proposed here and so far I can >>> see >>>>>> two shortcomings in the current state of the triggers that do not play >>>>> well >>>>>> with the new proposals, and more specifically the composite triggers >>> All >>>>>> and Any. >>>>>> >>>>>> So here it goes: >>>>>> >>>>>> 1) In the document posted above, there are some new useful trigger >>>>> combinations (like Any and All) which allow >>>>>> for combining primitive triggers. This decouples the TriggerResult of >>>>> each individual trigger from the action that >>>>>> is actually going to be executed. For example, an All trigger may have >>>>> one proposing FIRE while the other >>>>>> CONTINUE and the final result will be CONTINUE. >>>>>> >>>>>> In this case, any action that should be taken by each individual >>>>> trigger upon firing, e.g. cleaning its state as in the >>>>>> case of CountTrigger, should be postponed until the parent trigger >>> (All) >>>>> decides to fire. >>>>>> >>>>>> For this, there should be a onFire() method in each trigger that does >>>>> exactly that. This method should be called in the >>>>>> fireOrCleanup() of the windowOperator, when the firing is successful. >>>>>> >>>>>> 2) In the current implementation, when stateful triggers, like the >>>>> CountTrigger, are combined in a composite Trigger >>>>>> (with Any or All) their state is shared because the stateHandle is the >>>>> same for both. To solve this, the handle should >>>>>> become unique, BUT consistent for the same Trigger. The latter implies >>>>> that the handle for the same trigger after >>>>>> a node failure, should be the same as that of its predecessor (before >>>>> the failure). >>>>>> >>>>>> Let me know your thoughts on these. >>>>>> >>>>>> Kostas >>>>>> >>>>>> >>>>>>> On Jul 21, 2016, at 10:24 AM, Aljoscha Krettek <aljos...@apache.org> >>>>> wrote: >>>>>>> >>>>>>> I'm proposing to get this small change into 1.1: >>>>>>> https://issues.apache.org/jira/browse/FLINK-4239 This will make our >>>>> lives >>>>>>> easier with the future proposed changes. >>>>>>> >>>>>>> What do you think? >>>>>>> On Tue, 19 Jul 2016 at 11:41 Aljoscha Krettek <aljos...@apache.org> >>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> these new features should make it into the 1.2 release. We are >>> already >>>>>>>> working on releasing 1.1 so it won't make it for that one. >>>>> unfortunately. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> On Mon, 18 Jul 2016 at 23:19 Chen Qin <qinnc...@gmail.com> wrote: >>>>>>>> >>>>>>>>> BTW, do you have rough timeline in term of roll out it to >>> production? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Chen >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek < >>>>> aljos...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> Chen commented this on the doc (I'm mirroring here so everyone can >>>>>>>>> follow): >>>>>>>>>> "It would be cool to be able to access last snapshot of window >>> states >>>>>>>>>> before it get purged. Pipeline author might consider put it to >>>>> external >>>>>>>>>> storage and deal with late arriving events by restore corresponding >>>>>>>>>> window." >>>>>>>>>> >>>>>>>>>> My answer: >>>>>>>>>> This is partially covered by the section called "What Happens at >>>>>>>>>> Window-Cleanup Time, Who Decides When to Purge". What I want to >>>>>>>>> introduce >>>>>>>>>> is that the window can have one final emission if there is new data >>>>> in >>>>>>>>> the >>>>>>>>>> buffers at cleanup time. >>>>>>>>>> >>>>>>>>>> The work on this will also depend on this proposal: >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata >>>>>>>>>> With >>>>>>>>>> this, the WindowFunction can get meta data about the window firing >>>>> so it >>>>>>>>>> could be informed that this is the last firing before a cleanup and >>>>> that >>>>>>>>>> there already was an earlier, on-time firing. >>>>>>>>>> >>>>>>>>>> Does this cover your concerns, Chen? >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Aljoscha >>>>>>>>>> >>>>>>>>>> On Sun, 10 Jul 2016 at 21:24 Chen Qin <qinnc...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Sure. Currently, it looks like any element assigned to a too late >>>>>>>>> window >>>>>>>>>>> will be dropped silently😓 ? >>>>>>>>>>> >>>>>>>>>>> Having a late window stream imply somehow Flink needs to add one >>>>> more >>>>>>>>>> state >>>>>>>>>>> to window and split window state cleanup from window retirement. >>>>>>>>>>> I would suggest as simple as adding a function in trigger called >>>>>>>>>>> OnLateElement and always fire_purge it would enable developer >>> aware >>>>>>>>> and >>>>>>>>>>> handle this case. >>>>>>>>>>> >>>>>>>>>>> Chen >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek < >>>>> aljos...@apache.org >>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> @Chen I added a section at the end of the document regarding >>> access >>>>>>>>> to >>>>>>>>>>> the >>>>>>>>>>>> elements that are dropped as late. Right now, the section just >>>>>>>>> mentions >>>>>>>>>>>> that we have to do this but there is no real proposal yet for how >>>>>>>>> to do >>>>>>>>>>> it. >>>>>>>>>>>> Only a rough sketch so that we don't forget about it. >>>>>>>>>>>> >>>>>>>>>>>> On Fri, 8 Jul 2016 at 07:47 Chen Qin <qinnc...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> +1 for allowedLateness scenario. >>>>>>>>>>>>> >>>>>>>>>>>>> The rationale behind is there are backfills or data issues hold >>>>>>>>>>> in-window >>>>>>>>>>>>> data till watermark pass end time. It cause sink write partial >>>>>>>>>> output. >>>>>>>>>>>>> >>>>>>>>>>>>> Allow high allowedLateness threshold makes life easier to merge >>>>>>>>> those >>>>>>>>>>>>> results and overwrite partial output with correct output at >>> sink. >>>>>>>>> But >>>>>>>>>>>> yeah, >>>>>>>>>>>>> pipeline author is at risk of blow up statebackend with huge >>>>>>>>> states. >>>>>>>>>>>>> >>>>>>>>>>>>> Alternatively, In some case, if sink allows read-check-merge >>>>>>>>>> operation, >>>>>>>>>>>>> window can explicit call out events ingested after >>>>>>>>> allowedLateness. >>>>>>>>>> It >>>>>>>>>>>> asks >>>>>>>>>>>>> pipeline author mitigated these events in a way outside of flink >>>>>>>>>>>> ecosystem. >>>>>>>>>>>>> The catch is that since everywhere in a flink job can replay and >>>>>>>>>>>>> checkpoint, notification should somehow includes these info as >>>>>>>>> well. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Chen >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas < >>>>>>>>>>>>> k.klou...@data-artisans.com >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> In the effort to move the discussion to the mailing list, >>> rather >>>>>>>>>> than >>>>>>>>>>>> the >>>>>>>>>>>>>> doc, >>>>>>>>>>>>>> there was a comment in the doc: >>>>>>>>>>>>>> >>>>>>>>>>>>>> “It seems this proposal marries the allowed lateness of events >>>>>>>>> and >>>>>>>>>>> the >>>>>>>>>>>>>> discarding of window state. In most use cases this should be >>>>>>>>>>>> sufficient, >>>>>>>>>>>>>> but there are instances where having independent control of >>>>>>>>> these >>>>>>>>>> may >>>>>>>>>>>> be >>>>>>>>>>>>>> useful. >>>>>>>>>>>>>> >>>>>>>>>>>>>> For instance, you may have a job that computes some aggregate, >>>>>>>>>> like a >>>>>>>>>>>>> sum. >>>>>>>>>>>>>> You may want to keep the window state around for a while, but >>>>>>>>> not >>>>>>>>>> too >>>>>>>>>>>>> long. >>>>>>>>>>>>>> Yet you may want to continue processing late events after you >>>>>>>>>>> discarded >>>>>>>>>>>>> the >>>>>>>>>>>>>> window state. It is possible that your stream sinks can make >>>>>>>>> use of >>>>>>>>>>>> this >>>>>>>>>>>>>> data. For instance, they may be writing to a data store that >>>>>>>>>> returns >>>>>>>>>>> an >>>>>>>>>>>>>> error if a row already exists, which allow the sink to read the >>>>>>>>>>>> existing >>>>>>>>>>>>>> row and update it with the new data." >>>>>>>>>>>>>> >>>>>>>>>>>>>> To which I would like to reply: >>>>>>>>>>>>>> >>>>>>>>>>>>>> If I understand your use-case correctly, I believe that the >>>>>>>>>> proposed >>>>>>>>>>>>>> binding of the allowed lateness to the state purging does not >>>>>>>>>> impose >>>>>>>>>>>> any >>>>>>>>>>>>>> problem. The lateness specifies the upper time bound, after >>>>>>>>> which >>>>>>>>>> the >>>>>>>>>>>>> state >>>>>>>>>>>>>> will be discarded. Between the start of a window and its (end + >>>>>>>>>>>>>> allowedLateness) you can write custom triggers that fire, purge >>>>>>>>> the >>>>>>>>>>>>> state, >>>>>>>>>>>>>> or do nothing. Given this, I suppose that, at the most extreme >>>>>>>>>> case, >>>>>>>>>>>> you >>>>>>>>>>>>>> can specify an allowed lateness of Long.MaxValue and do the >>>>>>>>> purging >>>>>>>>>>> of >>>>>>>>>>>>> the >>>>>>>>>>>>>> state "manually". By doing this, you remove the safeguard of >>>>>>>>>> letting >>>>>>>>>>>> the >>>>>>>>>>>>>> system purge the state at some point in time, and you can do >>>>>>>>> your >>>>>>>>>> own >>>>>>>>>>>>>> custom state management that fits your needs. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Kostas >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek < >>>>>>>>>> aljos...@apache.org> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Vishnu Funny you should ask that because I have a design doc >>>>>>>>>> lying >>>>>>>>>>>>>> around. >>>>>>>>>>>>>>> I'll open a new mail thread to not hijack this one. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath < >>>>>>>>>>>>>> vishnu.viswanat...@gmail.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I was going through the suggested improvements in window, >>>>>>>>> and I >>>>>>>>>>> have >>>>>>>>>>>>>>>> few questions/suggestion on improvement regarding the >>>>>>>>> Evictor. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1) I am having a use case where I have to create a custom >>>>>>>>>> Evictor >>>>>>>>>>>> that >>>>>>>>>>>>>> will >>>>>>>>>>>>>>>> evict elements from the window based on the value (e.g., if I >>>>>>>>>> have >>>>>>>>>>>>>> elements >>>>>>>>>>>>>>>> are of case class Item(id: Int, type:String) then evict >>>>>>>>> elements >>>>>>>>>>>> that >>>>>>>>>>>>>> has >>>>>>>>>>>>>>>> type="a"). I believe this is not currently possible. >>>>>>>>>>>>>>>> 2) this is somewhat related to 1) where there should be an >>>>>>>>>> option >>>>>>>>>>> to >>>>>>>>>>>>>> evict >>>>>>>>>>>>>>>> elements from anywhere in the window. not only from the >>>>>>>>>> beginning >>>>>>>>>>> of >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> window. (e.g., apply the delta function to all elements and >>>>>>>>>> remove >>>>>>>>>>>> all >>>>>>>>>>>>>>>> those don't pass. I checked the code and evict method just >>>>>>>>>> returns >>>>>>>>>>>> the >>>>>>>>>>>>>>>> number of elements to be removed and processTriggerResult >>>>>>>>> just >>>>>>>>>>> skips >>>>>>>>>>>>>> those >>>>>>>>>>>>>>>> many elements from the beginning. >>>>>>>>>>>>>>>> 3) Add an option to enables the user to decide if the >>>>>>>>> eviction >>>>>>>>>>>> should >>>>>>>>>>>>>>>> happen before the apply function or after the apply function. >>>>>>>>>>>>> Currently >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>> is before the apply function, but I have a use case where I >>>>>>>>> need >>>>>>>>>>> to >>>>>>>>>>>>>> first >>>>>>>>>>>>>>>> apply the function and evict afterward. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am doing these for a POC so I think I can modify the flink >>>>>>>>>> code >>>>>>>>>>>> base >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> make these changes and build, but I would appreciate any >>>>>>>>>>> suggestion >>>>>>>>>>>> on >>>>>>>>>>>>>>>> whether these are viable changes or will there any >>>>>>>>> performance >>>>>>>>>>> issue >>>>>>>>>>>>> if >>>>>>>>>>>>>>>> these are done. Also any pointer on where to start(e.g, do I >>>>>>>>>>> create >>>>>>>>>>>> a >>>>>>>>>>>>>> new >>>>>>>>>>>>>>>> class similar to EvictingWindowOperator that extends >>>>>>>>>>>> WindowOperator?) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks and Regards, >>>>>>>>>>>>>>>> Vishnu Viswanath, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek < >>>>>>>>>>>> aljos...@apache.org >>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I did: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>> >>> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e >>>>>>>>>>>>>>>>> ;-) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <u...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek < >>>>>>>>>>>>> aljos...@apache.org >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> In the future, it might be good to to discussions >>>>>>>>> directly on >>>>>>>>>>> the >>>>>>>>>>>>> ML >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> then change the document accordingly. This way everyone >>>>>>>>> can >>>>>>>>>>>> follow >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> discussion on the ML. I also feel that Google Doc comments >>>>>>>>>>> often >>>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>>> give >>>>>>>>>>>>>>>>>>> enough space for expressing more complex opinions. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I agree! Would you mind raising this point as a separate >>>>>>>>>>>> discussion >>>>>>>>>>>>> on >>>>>>>>>>>>>>>>> dev@ >>>>>>>>>>>>>>>>>> ? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>>> >>> >>> >