And also I think that the shouldFire has to take as an additional argument
the time. This will help differentiate between ON_TIME and EARLY, LATE firings.

> On Jul 26, 2016, at 11:02 AM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hello,
> 
> This is a nice proposal that I think covers most of the cases.
> The only thing that is missing would be a method:
> 
> void onFire(Window window, TriggerContext ctx)
> 
> that will be responsible for doing whatever is necessary if the 
> windowOperator decides to fire. You can imagine resetting 
> the counter of a countTrigger to 0.
> 
> As a recap, the SimpleTrigger interface should be:
> 
> class SimpleTrigger {
> void onElement(T element, long timestamp, W window, TriggerContext ctx);
> boolean shouldFire(W window, TriggerContext cox);
> 
> void onMerge(W window, OnMergeContext cox);
> void onFire(Window window, TriggerContext ctx)
> void clear(W window, TriggerContext ctx);
> }
> 
> The onMerge and onFire methods can be seen as callbacks 
> and will be applied upon merge (in case of Session windows) and 
> upon firing.
> 
> What do you think?
> 
> Kostas
> 
>> On Jul 25, 2016, at 3:34 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
>> 
>> Hi,
>> yes, this is essentially the solution I had in my head but I went a bit
>> further and generalized it.
>> 
>> Basically, to make triggers composable they should have this interface,
>> let's call it SimpleTrigger for now:
>> 
>> class SimpleTrigger {
>> void onElement(T element, long timestamp, W window, TriggerContext ctx);
>> boolean shouldFire(W window, TriggerContext ctx);
>> void onMerge(W window, OnMergeContext ctx);
>> void clear(W window, TriggerContext ctx);
>> }
>> 
>> notice how onElement() cannot return a TriggerResult anymore and how
>> onEventTime() and onProcessingTime() of the currently existing Trigger
>> interface were folded into shouldFire(). Each trigger essentially becomes a
>> predicate that says at any given time whether they would fire the window.
>> Having just one method that can decide whether to fire or not makes these
>> easily composable to form complex triggers, thus enabling the trigger DSL
>> we want to implement.
>> 
>> The way to go about implementing this is either to replace our current
>> Trigger interface by this new interface or to keep our more powerful
>> interface with all the customization options and have one
>> SimpleTriggerTrigger that can execute a tree of SimpleTriggers. A rough
>> sketch of this would be this:
>> https://gist.github.com/aljoscha/66b0fcab89cd2b6190a63899f461067f
>> 
>> Cheers,
>> Aljoscha
>> 
>> 
>> 
>> On Mon, 25 Jul 2016 at 14:33 Kostas Kloudas <k.klou...@data-artisans.com>
>> wrote:
>> 
>>> Hi Aljoscha,
>>> 
>>> This was exactly one of the problems I also found.
>>> 
>>> The way I was thinking about it is the following:
>>> 
>>> Conceptually, time (event and processing) advances but state is a
>>> fixed property of the window.
>>> 
>>> Given this, I modified the Count trigger to also ask for the
>>> current state (count) of the window in all method (e.g. onEventTime and
>>> onProcessingTime). This way the trigger can be composed and play
>>> well with the other triggers.
>>> 
>>> If you have any more ideas on that and the rest of the problems I
>>> sent in the previous email, please let me know.
>>> 
>>> Kostas
>>> 
>>>> On Jul 25, 2016, at 2:22 PM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>> 
>>>> These are some very interesting thoughts! I have some more, based on
>>> these:
>>>> 
>>>> What happens if you have for example this Trigger:
>>>> All(Watermark.pastEndOfWindow(), Count.atLeast(10))
>>>> 
>>>> When would this even fire, i.e. what are the steps that lead to this
>>>> combined trigger firing with the Trigger system that we currently have in
>>>> place?
>>>> 
>>>> I have some thoughts but they are not compatible with the way we
>>> currently
>>>> handle triggers. I have to think some more, but please shoot if you have
>>>> any ideas.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>> 
>>>> On Fri, 22 Jul 2016 at 13:10 Kostas Kloudas <k.klou...@data-artisans.com
>>>> 
>>>> wrote:
>>>> 
>>>>> Forgot to say that the signature for the onFire() that I think fits
>>> should
>>>>> be:
>>>>> 
>>>>> void onFire(Window window, TriggerContext ctx) throws Exception;
>>>>> 
>>>>>> On Jul 22, 2016, at 12:47 PM, Kostas Kloudas <
>>>>> k.klou...@data-artisans.com> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> I started working on the new triggers proposed here and so far I can
>>> see
>>>>>> two shortcomings in the current state of the triggers that do not play
>>>>> well
>>>>>> with the new proposals, and more specifically the composite triggers
>>> All
>>>>>> and Any.
>>>>>> 
>>>>>> So here it goes:
>>>>>> 
>>>>>> 1) In the document posted above, there are some new useful trigger
>>>>> combinations (like Any and All) which allow
>>>>>> for combining primitive triggers. This decouples the TriggerResult of
>>>>> each individual trigger from the action that
>>>>>> is actually going to be executed. For example, an All trigger may have
>>>>> one proposing FIRE while the other
>>>>>> CONTINUE and the final result will be CONTINUE.
>>>>>> 
>>>>>> In this case, any action that  should be taken by each individual
>>>>> trigger upon firing, e.g. cleaning its state as in the
>>>>>> case of CountTrigger, should be postponed until the parent trigger
>>> (All)
>>>>> decides to fire.
>>>>>> 
>>>>>> For this, there should be a onFire() method in each trigger that does
>>>>> exactly that. This method should be called in the
>>>>>> fireOrCleanup() of the windowOperator, when the firing is successful.
>>>>>> 
>>>>>> 2) In the current implementation, when stateful triggers, like the
>>>>> CountTrigger, are combined in a composite Trigger
>>>>>> (with Any or All) their state is shared because the stateHandle is the
>>>>> same for both. To solve this, the handle should
>>>>>> become unique, BUT consistent for the same Trigger. The latter implies
>>>>> that the handle for the same trigger after
>>>>>> a node failure, should be the same as that of its predecessor (before
>>>>> the failure).
>>>>>> 
>>>>>> Let me know your thoughts on these.
>>>>>> 
>>>>>> Kostas
>>>>>> 
>>>>>> 
>>>>>>> On Jul 21, 2016, at 10:24 AM, Aljoscha Krettek <aljos...@apache.org>
>>>>> wrote:
>>>>>>> 
>>>>>>> I'm proposing to get this small change into 1.1:
>>>>>>> https://issues.apache.org/jira/browse/FLINK-4239 This will make our
>>>>> lives
>>>>>>> easier with the future proposed changes.
>>>>>>> 
>>>>>>> What do you think?
>>>>>>> On Tue, 19 Jul 2016 at 11:41 Aljoscha Krettek <aljos...@apache.org>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> these new features should make it into the 1.2 release. We are
>>> already
>>>>>>>> working on releasing 1.1 so it won't make it for that one.
>>>>> unfortunately.
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>> 
>>>>>>>> On Mon, 18 Jul 2016 at 23:19 Chen Qin <qinnc...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>>> BTW, do you have rough timeline in term of roll out it to
>>> production?
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Chen
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek <
>>>>> aljos...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> Chen commented this on the doc (I'm mirroring here so everyone can
>>>>>>>>> follow):
>>>>>>>>>> "It would be cool to be able to access last snapshot of window
>>> states
>>>>>>>>>> before it get purged. Pipeline author might consider put it to
>>>>> external
>>>>>>>>>> storage and deal with late arriving events by restore corresponding
>>>>>>>>>> window."
>>>>>>>>>> 
>>>>>>>>>> My answer:
>>>>>>>>>> This is partially covered by the section called "What Happens at
>>>>>>>>>> Window-Cleanup Time, Who Decides When to Purge". What I want to
>>>>>>>>> introduce
>>>>>>>>>> is that the window can have one final emission if there is new data
>>>>> in
>>>>>>>>> the
>>>>>>>>>> buffers at cleanup time.
>>>>>>>>>> 
>>>>>>>>>> The work on this will also depend on this proposal:
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>> 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
>>>>>>>>>> With
>>>>>>>>>> this, the WindowFunction can get meta data about the window firing
>>>>> so it
>>>>>>>>>> could be informed that this is the last firing before a cleanup and
>>>>> that
>>>>>>>>>> there already was an earlier, on-time firing.
>>>>>>>>>> 
>>>>>>>>>> Does this cover your concerns, Chen?
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>> 
>>>>>>>>>> On Sun, 10 Jul 2016 at 21:24 Chen Qin <qinnc...@gmail.com> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Sure. Currently, it looks like any element assigned to a too late
>>>>>>>>> window
>>>>>>>>>>> will be dropped silently😓 ?
>>>>>>>>>>> 
>>>>>>>>>>> Having a late window stream imply somehow Flink needs to add one
>>>>> more
>>>>>>>>>> state
>>>>>>>>>>> to window and split window state cleanup from window retirement.
>>>>>>>>>>> I would suggest as simple as adding a function in trigger called
>>>>>>>>>>> OnLateElement and always fire_purge it would enable developer
>>> aware
>>>>>>>>> and
>>>>>>>>>>> handle this case.
>>>>>>>>>>> 
>>>>>>>>>>> Chen
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <
>>>>> aljos...@apache.org
>>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> @Chen I added a section at the end of the document regarding
>>> access
>>>>>>>>> to
>>>>>>>>>>> the
>>>>>>>>>>>> elements that are dropped as late. Right now, the section just
>>>>>>>>> mentions
>>>>>>>>>>>> that we have to do this but there is no real proposal yet for how
>>>>>>>>> to do
>>>>>>>>>>> it.
>>>>>>>>>>>> Only a rough sketch so that we don't forget about it.
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, 8 Jul 2016 at 07:47 Chen Qin <qinnc...@gmail.com> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> +1 for allowedLateness scenario.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The rationale behind is there are backfills or data issues hold
>>>>>>>>>>> in-window
>>>>>>>>>>>>> data till watermark pass end time. It cause sink write partial
>>>>>>>>>> output.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Allow high allowedLateness threshold makes life easier to merge
>>>>>>>>> those
>>>>>>>>>>>>> results and overwrite partial output with correct output at
>>> sink.
>>>>>>>>> But
>>>>>>>>>>>> yeah,
>>>>>>>>>>>>> pipeline author is at risk of blow up statebackend with huge
>>>>>>>>> states.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Alternatively, In some case, if sink allows read-check-merge
>>>>>>>>>> operation,
>>>>>>>>>>>>> window can explicit call out events ingested after
>>>>>>>>> allowedLateness.
>>>>>>>>>> It
>>>>>>>>>>>> asks
>>>>>>>>>>>>> pipeline author mitigated these events in a way outside of flink
>>>>>>>>>>>> ecosystem.
>>>>>>>>>>>>> The catch is that since everywhere in a flink job can replay and
>>>>>>>>>>>>> checkpoint, notification should somehow includes these info as
>>>>>>>>> well.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Chen
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
>>>>>>>>>>>>> k.klou...@data-artisans.com
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> In the effort to move the discussion to the mailing list,
>>> rather
>>>>>>>>>> than
>>>>>>>>>>>> the
>>>>>>>>>>>>>> doc,
>>>>>>>>>>>>>> there was a comment in the doc:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> “It seems this proposal marries the allowed lateness of events
>>>>>>>>> and
>>>>>>>>>>> the
>>>>>>>>>>>>>> discarding of window state. In most use cases this should be
>>>>>>>>>>>> sufficient,
>>>>>>>>>>>>>> but there are instances where having independent control of
>>>>>>>>> these
>>>>>>>>>> may
>>>>>>>>>>>> be
>>>>>>>>>>>>>> useful.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> For instance, you may have a job that computes some aggregate,
>>>>>>>>>> like a
>>>>>>>>>>>>> sum.
>>>>>>>>>>>>>> You may want to keep the window state around for a while, but
>>>>>>>>> not
>>>>>>>>>> too
>>>>>>>>>>>>> long.
>>>>>>>>>>>>>> Yet you may want to continue processing late events after you
>>>>>>>>>>> discarded
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> window state. It is possible that your stream sinks can make
>>>>>>>>> use of
>>>>>>>>>>>> this
>>>>>>>>>>>>>> data. For instance, they may be writing to a data store that
>>>>>>>>>> returns
>>>>>>>>>>> an
>>>>>>>>>>>>>> error if a row already exists, which allow the sink to read the
>>>>>>>>>>>> existing
>>>>>>>>>>>>>> row and update it with the new data."
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> To which I would like to reply:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> If I understand your use-case correctly, I believe that the
>>>>>>>>>> proposed
>>>>>>>>>>>>>> binding of the allowed lateness to the state purging does not
>>>>>>>>>> impose
>>>>>>>>>>>> any
>>>>>>>>>>>>>> problem. The lateness specifies the upper time bound, after
>>>>>>>>> which
>>>>>>>>>> the
>>>>>>>>>>>>> state
>>>>>>>>>>>>>> will be discarded. Between the start of a window and its (end +
>>>>>>>>>>>>>> allowedLateness) you can write custom triggers that fire, purge
>>>>>>>>> the
>>>>>>>>>>>>> state,
>>>>>>>>>>>>>> or do nothing. Given this, I suppose that, at the most extreme
>>>>>>>>>> case,
>>>>>>>>>>>> you
>>>>>>>>>>>>>> can specify an allowed lateness of Long.MaxValue and do the
>>>>>>>>> purging
>>>>>>>>>>> of
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> state "manually". By doing this, you remove the safeguard of
>>>>>>>>>> letting
>>>>>>>>>>>> the
>>>>>>>>>>>>>> system purge the state at some point in time, and you can do
>>>>>>>>> your
>>>>>>>>>> own
>>>>>>>>>>>>>> custom state management that fits your needs.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Kostas
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek <
>>>>>>>>>> aljos...@apache.org>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> @Vishnu Funny you should ask that because I have a design doc
>>>>>>>>>> lying
>>>>>>>>>>>>>> around.
>>>>>>>>>>>>>>> I'll open a new mail thread to not hijack this one.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <
>>>>>>>>>>>>>> vishnu.viswanat...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I was going through the suggested improvements in window,
>>>>>>>>> and I
>>>>>>>>>>> have
>>>>>>>>>>>>>>>> few questions/suggestion on improvement regarding the
>>>>>>>>> Evictor.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 1) I am having a use case where I have to create a custom
>>>>>>>>>> Evictor
>>>>>>>>>>>> that
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>> evict elements from the window based on the value (e.g., if I
>>>>>>>>>> have
>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>> are of case class Item(id: Int, type:String) then evict
>>>>>>>>> elements
>>>>>>>>>>>> that
>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>> type="a"). I believe this is not currently possible.
>>>>>>>>>>>>>>>> 2) this is somewhat related to 1) where there should be an
>>>>>>>>>> option
>>>>>>>>>>> to
>>>>>>>>>>>>>> evict
>>>>>>>>>>>>>>>> elements from anywhere in the window. not only from the
>>>>>>>>>> beginning
>>>>>>>>>>> of
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> window. (e.g., apply the delta function to all elements and
>>>>>>>>>> remove
>>>>>>>>>>>> all
>>>>>>>>>>>>>>>> those don't pass. I checked the code and evict method just
>>>>>>>>>> returns
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> number of elements to be removed and processTriggerResult
>>>>>>>>> just
>>>>>>>>>>> skips
>>>>>>>>>>>>>> those
>>>>>>>>>>>>>>>> many elements from the beginning.
>>>>>>>>>>>>>>>> 3) Add an option to enables the user to decide if the
>>>>>>>>> eviction
>>>>>>>>>>>> should
>>>>>>>>>>>>>>>> happen before the apply function or after the apply function.
>>>>>>>>>>>>> Currently
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> is before the apply function, but I have a use case where I
>>>>>>>>> need
>>>>>>>>>>> to
>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>> apply the function and evict afterward.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I am doing these for a POC so I think I can modify the flink
>>>>>>>>>> code
>>>>>>>>>>>> base
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> make these changes and build, but I would appreciate any
>>>>>>>>>>> suggestion
>>>>>>>>>>>> on
>>>>>>>>>>>>>>>> whether these are viable changes or will there any
>>>>>>>>> performance
>>>>>>>>>>> issue
>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>> these are done. Also any pointer on where to start(e.g, do I
>>>>>>>>>>> create
>>>>>>>>>>>> a
>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>> class similar to EvictingWindowOperator that extends
>>>>>>>>>>>> WindowOperator?)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks and Regards,
>>>>>>>>>>>>>>>> Vishnu Viswanath,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek <
>>>>>>>>>>>> aljos...@apache.org
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I did:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>> 
>>> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e
>>>>>>>>>>>>>>>>> ;-)
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <u...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <
>>>>>>>>>>>>> aljos...@apache.org
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> In the future, it might be good to to discussions
>>>>>>>>> directly on
>>>>>>>>>>> the
>>>>>>>>>>>>> ML
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> then change the document accordingly. This way everyone
>>>>>>>>> can
>>>>>>>>>>>> follow
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> discussion on the ML. I also feel that Google Doc comments
>>>>>>>>>>> often
>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>> give
>>>>>>>>>>>>>>>>>>> enough space for expressing more complex opinions.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> I agree! Would you mind raising this point as a separate
>>>>>>>>>>>> discussion
>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>> dev@
>>>>>>>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
> 

Reply via email to