Hello, This is a nice proposal that I think covers most of the cases. The only thing that is missing would be a method:
void onFire(Window window, TriggerContext ctx) that will be responsible for doing whatever is necessary if the windowOperator decides to fire. You can imagine resetting the counter of a countTrigger to 0. As a recap, the SimpleTrigger interface should be: class SimpleTrigger { void onElement(T element, long timestamp, W window, TriggerContext ctx); boolean shouldFire(W window, TriggerContext cox); void onMerge(W window, OnMergeContext cox); void onFire(Window window, TriggerContext ctx) void clear(W window, TriggerContext ctx); } The onMerge and onFire methods can be seen as callbacks and will be applied upon merge (in case of Session windows) and upon firing. What do you think? Kostas > On Jul 25, 2016, at 3:34 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi, > yes, this is essentially the solution I had in my head but I went a bit > further and generalized it. > > Basically, to make triggers composable they should have this interface, > let's call it SimpleTrigger for now: > > class SimpleTrigger { > void onElement(T element, long timestamp, W window, TriggerContext ctx); > boolean shouldFire(W window, TriggerContext ctx); > void onMerge(W window, OnMergeContext ctx); > void clear(W window, TriggerContext ctx); > } > > notice how onElement() cannot return a TriggerResult anymore and how > onEventTime() and onProcessingTime() of the currently existing Trigger > interface were folded into shouldFire(). Each trigger essentially becomes a > predicate that says at any given time whether they would fire the window. > Having just one method that can decide whether to fire or not makes these > easily composable to form complex triggers, thus enabling the trigger DSL > we want to implement. > > The way to go about implementing this is either to replace our current > Trigger interface by this new interface or to keep our more powerful > interface with all the customization options and have one > SimpleTriggerTrigger that can execute a tree of SimpleTriggers. A rough > sketch of this would be this: > https://gist.github.com/aljoscha/66b0fcab89cd2b6190a63899f461067f > > Cheers, > Aljoscha > > > > On Mon, 25 Jul 2016 at 14:33 Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > >> Hi Aljoscha, >> >> This was exactly one of the problems I also found. >> >> The way I was thinking about it is the following: >> >> Conceptually, time (event and processing) advances but state is a >> fixed property of the window. >> >> Given this, I modified the Count trigger to also ask for the >> current state (count) of the window in all method (e.g. onEventTime and >> onProcessingTime). This way the trigger can be composed and play >> well with the other triggers. >> >> If you have any more ideas on that and the rest of the problems I >> sent in the previous email, please let me know. >> >> Kostas >> >>> On Jul 25, 2016, at 2:22 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >>> >>> These are some very interesting thoughts! I have some more, based on >> these: >>> >>> What happens if you have for example this Trigger: >>> All(Watermark.pastEndOfWindow(), Count.atLeast(10)) >>> >>> When would this even fire, i.e. what are the steps that lead to this >>> combined trigger firing with the Trigger system that we currently have in >>> place? >>> >>> I have some thoughts but they are not compatible with the way we >> currently >>> handle triggers. I have to think some more, but please shoot if you have >>> any ideas. >>> >>> Cheers, >>> Aljoscha >>> >>> On Fri, 22 Jul 2016 at 13:10 Kostas Kloudas <k.klou...@data-artisans.com >>> >>> wrote: >>> >>>> Forgot to say that the signature for the onFire() that I think fits >> should >>>> be: >>>> >>>> void onFire(Window window, TriggerContext ctx) throws Exception; >>>> >>>>> On Jul 22, 2016, at 12:47 PM, Kostas Kloudas < >>>> k.klou...@data-artisans.com> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I started working on the new triggers proposed here and so far I can >> see >>>>> two shortcomings in the current state of the triggers that do not play >>>> well >>>>> with the new proposals, and more specifically the composite triggers >> All >>>>> and Any. >>>>> >>>>> So here it goes: >>>>> >>>>> 1) In the document posted above, there are some new useful trigger >>>> combinations (like Any and All) which allow >>>>> for combining primitive triggers. This decouples the TriggerResult of >>>> each individual trigger from the action that >>>>> is actually going to be executed. For example, an All trigger may have >>>> one proposing FIRE while the other >>>>> CONTINUE and the final result will be CONTINUE. >>>>> >>>>> In this case, any action that should be taken by each individual >>>> trigger upon firing, e.g. cleaning its state as in the >>>>> case of CountTrigger, should be postponed until the parent trigger >> (All) >>>> decides to fire. >>>>> >>>>> For this, there should be a onFire() method in each trigger that does >>>> exactly that. This method should be called in the >>>>> fireOrCleanup() of the windowOperator, when the firing is successful. >>>>> >>>>> 2) In the current implementation, when stateful triggers, like the >>>> CountTrigger, are combined in a composite Trigger >>>>> (with Any or All) their state is shared because the stateHandle is the >>>> same for both. To solve this, the handle should >>>>> become unique, BUT consistent for the same Trigger. The latter implies >>>> that the handle for the same trigger after >>>>> a node failure, should be the same as that of its predecessor (before >>>> the failure). >>>>> >>>>> Let me know your thoughts on these. >>>>> >>>>> Kostas >>>>> >>>>> >>>>>> On Jul 21, 2016, at 10:24 AM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>>>> >>>>>> I'm proposing to get this small change into 1.1: >>>>>> https://issues.apache.org/jira/browse/FLINK-4239 This will make our >>>> lives >>>>>> easier with the future proposed changes. >>>>>> >>>>>> What do you think? >>>>>> On Tue, 19 Jul 2016 at 11:41 Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> these new features should make it into the 1.2 release. We are >> already >>>>>>> working on releasing 1.1 so it won't make it for that one. >>>> unfortunately. >>>>>>> >>>>>>> Cheers, >>>>>>> Aljoscha >>>>>>> >>>>>>> On Mon, 18 Jul 2016 at 23:19 Chen Qin <qinnc...@gmail.com> wrote: >>>>>>> >>>>>>>> BTW, do you have rough timeline in term of roll out it to >> production? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Chen >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek < >>>> aljos...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> Chen commented this on the doc (I'm mirroring here so everyone can >>>>>>>> follow): >>>>>>>>> "It would be cool to be able to access last snapshot of window >> states >>>>>>>>> before it get purged. Pipeline author might consider put it to >>>> external >>>>>>>>> storage and deal with late arriving events by restore corresponding >>>>>>>>> window." >>>>>>>>> >>>>>>>>> My answer: >>>>>>>>> This is partially covered by the section called "What Happens at >>>>>>>>> Window-Cleanup Time, Who Decides When to Purge". What I want to >>>>>>>> introduce >>>>>>>>> is that the window can have one final emission if there is new data >>>> in >>>>>>>> the >>>>>>>>> buffers at cleanup time. >>>>>>>>> >>>>>>>>> The work on this will also depend on this proposal: >>>>>>>>> >>>>>>>>> >>>>>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata >>>>>>>>> With >>>>>>>>> this, the WindowFunction can get meta data about the window firing >>>> so it >>>>>>>>> could be informed that this is the last firing before a cleanup and >>>> that >>>>>>>>> there already was an earlier, on-time firing. >>>>>>>>> >>>>>>>>> Does this cover your concerns, Chen? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Aljoscha >>>>>>>>> >>>>>>>>> On Sun, 10 Jul 2016 at 21:24 Chen Qin <qinnc...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Sure. Currently, it looks like any element assigned to a too late >>>>>>>> window >>>>>>>>>> will be dropped silently😓 ? >>>>>>>>>> >>>>>>>>>> Having a late window stream imply somehow Flink needs to add one >>>> more >>>>>>>>> state >>>>>>>>>> to window and split window state cleanup from window retirement. >>>>>>>>>> I would suggest as simple as adding a function in trigger called >>>>>>>>>> OnLateElement and always fire_purge it would enable developer >> aware >>>>>>>> and >>>>>>>>>> handle this case. >>>>>>>>>> >>>>>>>>>> Chen >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek < >>>> aljos...@apache.org >>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> @Chen I added a section at the end of the document regarding >> access >>>>>>>> to >>>>>>>>>> the >>>>>>>>>>> elements that are dropped as late. Right now, the section just >>>>>>>> mentions >>>>>>>>>>> that we have to do this but there is no real proposal yet for how >>>>>>>> to do >>>>>>>>>> it. >>>>>>>>>>> Only a rough sketch so that we don't forget about it. >>>>>>>>>>> >>>>>>>>>>> On Fri, 8 Jul 2016 at 07:47 Chen Qin <qinnc...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> +1 for allowedLateness scenario. >>>>>>>>>>>> >>>>>>>>>>>> The rationale behind is there are backfills or data issues hold >>>>>>>>>> in-window >>>>>>>>>>>> data till watermark pass end time. It cause sink write partial >>>>>>>>> output. >>>>>>>>>>>> >>>>>>>>>>>> Allow high allowedLateness threshold makes life easier to merge >>>>>>>> those >>>>>>>>>>>> results and overwrite partial output with correct output at >> sink. >>>>>>>> But >>>>>>>>>>> yeah, >>>>>>>>>>>> pipeline author is at risk of blow up statebackend with huge >>>>>>>> states. >>>>>>>>>>>> >>>>>>>>>>>> Alternatively, In some case, if sink allows read-check-merge >>>>>>>>> operation, >>>>>>>>>>>> window can explicit call out events ingested after >>>>>>>> allowedLateness. >>>>>>>>> It >>>>>>>>>>> asks >>>>>>>>>>>> pipeline author mitigated these events in a way outside of flink >>>>>>>>>>> ecosystem. >>>>>>>>>>>> The catch is that since everywhere in a flink job can replay and >>>>>>>>>>>> checkpoint, notification should somehow includes these info as >>>>>>>> well. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Chen >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas < >>>>>>>>>>>> k.klou...@data-artisans.com >>>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> In the effort to move the discussion to the mailing list, >> rather >>>>>>>>> than >>>>>>>>>>> the >>>>>>>>>>>>> doc, >>>>>>>>>>>>> there was a comment in the doc: >>>>>>>>>>>>> >>>>>>>>>>>>> “It seems this proposal marries the allowed lateness of events >>>>>>>> and >>>>>>>>>> the >>>>>>>>>>>>> discarding of window state. In most use cases this should be >>>>>>>>>>> sufficient, >>>>>>>>>>>>> but there are instances where having independent control of >>>>>>>> these >>>>>>>>> may >>>>>>>>>>> be >>>>>>>>>>>>> useful. >>>>>>>>>>>>> >>>>>>>>>>>>> For instance, you may have a job that computes some aggregate, >>>>>>>>> like a >>>>>>>>>>>> sum. >>>>>>>>>>>>> You may want to keep the window state around for a while, but >>>>>>>> not >>>>>>>>> too >>>>>>>>>>>> long. >>>>>>>>>>>>> Yet you may want to continue processing late events after you >>>>>>>>>> discarded >>>>>>>>>>>> the >>>>>>>>>>>>> window state. It is possible that your stream sinks can make >>>>>>>> use of >>>>>>>>>>> this >>>>>>>>>>>>> data. For instance, they may be writing to a data store that >>>>>>>>> returns >>>>>>>>>> an >>>>>>>>>>>>> error if a row already exists, which allow the sink to read the >>>>>>>>>>> existing >>>>>>>>>>>>> row and update it with the new data." >>>>>>>>>>>>> >>>>>>>>>>>>> To which I would like to reply: >>>>>>>>>>>>> >>>>>>>>>>>>> If I understand your use-case correctly, I believe that the >>>>>>>>> proposed >>>>>>>>>>>>> binding of the allowed lateness to the state purging does not >>>>>>>>> impose >>>>>>>>>>> any >>>>>>>>>>>>> problem. The lateness specifies the upper time bound, after >>>>>>>> which >>>>>>>>> the >>>>>>>>>>>> state >>>>>>>>>>>>> will be discarded. Between the start of a window and its (end + >>>>>>>>>>>>> allowedLateness) you can write custom triggers that fire, purge >>>>>>>> the >>>>>>>>>>>> state, >>>>>>>>>>>>> or do nothing. Given this, I suppose that, at the most extreme >>>>>>>>> case, >>>>>>>>>>> you >>>>>>>>>>>>> can specify an allowed lateness of Long.MaxValue and do the >>>>>>>> purging >>>>>>>>>> of >>>>>>>>>>>> the >>>>>>>>>>>>> state "manually". By doing this, you remove the safeguard of >>>>>>>>> letting >>>>>>>>>>> the >>>>>>>>>>>>> system purge the state at some point in time, and you can do >>>>>>>> your >>>>>>>>> own >>>>>>>>>>>>> custom state management that fits your needs. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Kostas >>>>>>>>>>>>> >>>>>>>>>>>>>> On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek < >>>>>>>>> aljos...@apache.org> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> @Vishnu Funny you should ask that because I have a design doc >>>>>>>>> lying >>>>>>>>>>>>> around. >>>>>>>>>>>>>> I'll open a new mail thread to not hijack this one. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath < >>>>>>>>>>>>> vishnu.viswanat...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I was going through the suggested improvements in window, >>>>>>>> and I >>>>>>>>>> have >>>>>>>>>>>>>>> few questions/suggestion on improvement regarding the >>>>>>>> Evictor. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1) I am having a use case where I have to create a custom >>>>>>>>> Evictor >>>>>>>>>>> that >>>>>>>>>>>>> will >>>>>>>>>>>>>>> evict elements from the window based on the value (e.g., if I >>>>>>>>> have >>>>>>>>>>>>> elements >>>>>>>>>>>>>>> are of case class Item(id: Int, type:String) then evict >>>>>>>> elements >>>>>>>>>>> that >>>>>>>>>>>>> has >>>>>>>>>>>>>>> type="a"). I believe this is not currently possible. >>>>>>>>>>>>>>> 2) this is somewhat related to 1) where there should be an >>>>>>>>> option >>>>>>>>>> to >>>>>>>>>>>>> evict >>>>>>>>>>>>>>> elements from anywhere in the window. not only from the >>>>>>>>> beginning >>>>>>>>>> of >>>>>>>>>>>> the >>>>>>>>>>>>>>> window. (e.g., apply the delta function to all elements and >>>>>>>>> remove >>>>>>>>>>> all >>>>>>>>>>>>>>> those don't pass. I checked the code and evict method just >>>>>>>>> returns >>>>>>>>>>> the >>>>>>>>>>>>>>> number of elements to be removed and processTriggerResult >>>>>>>> just >>>>>>>>>> skips >>>>>>>>>>>>> those >>>>>>>>>>>>>>> many elements from the beginning. >>>>>>>>>>>>>>> 3) Add an option to enables the user to decide if the >>>>>>>> eviction >>>>>>>>>>> should >>>>>>>>>>>>>>> happen before the apply function or after the apply function. >>>>>>>>>>>> Currently >>>>>>>>>>>>> it >>>>>>>>>>>>>>> is before the apply function, but I have a use case where I >>>>>>>> need >>>>>>>>>> to >>>>>>>>>>>>> first >>>>>>>>>>>>>>> apply the function and evict afterward. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am doing these for a POC so I think I can modify the flink >>>>>>>>> code >>>>>>>>>>> base >>>>>>>>>>>>> to >>>>>>>>>>>>>>> make these changes and build, but I would appreciate any >>>>>>>>>> suggestion >>>>>>>>>>> on >>>>>>>>>>>>>>> whether these are viable changes or will there any >>>>>>>> performance >>>>>>>>>> issue >>>>>>>>>>>> if >>>>>>>>>>>>>>> these are done. Also any pointer on where to start(e.g, do I >>>>>>>>>> create >>>>>>>>>>> a >>>>>>>>>>>>> new >>>>>>>>>>>>>>> class similar to EvictingWindowOperator that extends >>>>>>>>>>> WindowOperator?) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks and Regards, >>>>>>>>>>>>>>> Vishnu Viswanath, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek < >>>>>>>>>>> aljos...@apache.org >>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I did: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>> >> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e >>>>>>>>>>>>>>>> ;-) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <u...@apache.org> >>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek < >>>>>>>>>>>> aljos...@apache.org >>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> In the future, it might be good to to discussions >>>>>>>> directly on >>>>>>>>>> the >>>>>>>>>>>> ML >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> then change the document accordingly. This way everyone >>>>>>>> can >>>>>>>>>>> follow >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> discussion on the ML. I also feel that Google Doc comments >>>>>>>>>> often >>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>> give >>>>>>>>>>>>>>>>>> enough space for expressing more complex opinions. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I agree! Would you mind raising this point as a separate >>>>>>>>>>> discussion >>>>>>>>>>>> on >>>>>>>>>>>>>>>> dev@ >>>>>>>>>>>>>>>>> ? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >>>> >> >>