Hello,

This is a nice proposal that I think covers most of the cases.
The only thing that is missing would be a method:

void onFire(Window window, TriggerContext ctx)

that will be responsible for doing whatever is necessary if the 
windowOperator decides to fire. You can imagine resetting 
the counter of a countTrigger to 0.

As a recap, the SimpleTrigger interface should be:

class SimpleTrigger {
 void onElement(T element, long timestamp, W window, TriggerContext ctx);
 boolean shouldFire(W window, TriggerContext cox);

 void onMerge(W window, OnMergeContext cox);
 void onFire(Window window, TriggerContext ctx)
 void clear(W window, TriggerContext ctx);
}

The onMerge and onFire methods can be seen as callbacks 
and will be applied upon merge (in case of Session windows) and 
upon firing.

What do you think?

Kostas

> On Jul 25, 2016, at 3:34 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi,
> yes, this is essentially the solution I had in my head but I went a bit
> further and generalized it.
> 
> Basically, to make triggers composable they should have this interface,
> let's call it SimpleTrigger for now:
> 
> class SimpleTrigger {
>  void onElement(T element, long timestamp, W window, TriggerContext ctx);
>  boolean shouldFire(W window, TriggerContext ctx);
>  void onMerge(W window, OnMergeContext ctx);
>  void clear(W window, TriggerContext ctx);
> }
> 
> notice how onElement() cannot return a TriggerResult anymore and how
> onEventTime() and onProcessingTime() of the currently existing Trigger
> interface were folded into shouldFire(). Each trigger essentially becomes a
> predicate that says at any given time whether they would fire the window.
> Having just one method that can decide whether to fire or not makes these
> easily composable to form complex triggers, thus enabling the trigger DSL
> we want to implement.
> 
> The way to go about implementing this is either to replace our current
> Trigger interface by this new interface or to keep our more powerful
> interface with all the customization options and have one
> SimpleTriggerTrigger that can execute a tree of SimpleTriggers. A rough
> sketch of this would be this:
> https://gist.github.com/aljoscha/66b0fcab89cd2b6190a63899f461067f
> 
> Cheers,
> Aljoscha
> 
> 
> 
> On Mon, 25 Jul 2016 at 14:33 Kostas Kloudas <k.klou...@data-artisans.com>
> wrote:
> 
>> Hi Aljoscha,
>> 
>> This was exactly one of the problems I also found.
>> 
>> The way I was thinking about it is the following:
>> 
>> Conceptually, time (event and processing) advances but state is a
>> fixed property of the window.
>> 
>> Given this, I modified the Count trigger to also ask for the
>> current state (count) of the window in all method (e.g. onEventTime and
>> onProcessingTime). This way the trigger can be composed and play
>> well with the other triggers.
>> 
>> If you have any more ideas on that and the rest of the problems I
>> sent in the previous email, please let me know.
>> 
>> Kostas
>> 
>>> On Jul 25, 2016, at 2:22 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>> 
>>> These are some very interesting thoughts! I have some more, based on
>> these:
>>> 
>>> What happens if you have for example this Trigger:
>>> All(Watermark.pastEndOfWindow(), Count.atLeast(10))
>>> 
>>> When would this even fire, i.e. what are the steps that lead to this
>>> combined trigger firing with the Trigger system that we currently have in
>>> place?
>>> 
>>> I have some thoughts but they are not compatible with the way we
>> currently
>>> handle triggers. I have to think some more, but please shoot if you have
>>> any ideas.
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>>> On Fri, 22 Jul 2016 at 13:10 Kostas Kloudas <k.klou...@data-artisans.com
>>> 
>>> wrote:
>>> 
>>>> Forgot to say that the signature for the onFire() that I think fits
>> should
>>>> be:
>>>> 
>>>> void onFire(Window window, TriggerContext ctx) throws Exception;
>>>> 
>>>>> On Jul 22, 2016, at 12:47 PM, Kostas Kloudas <
>>>> k.klou...@data-artisans.com> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I started working on the new triggers proposed here and so far I can
>> see
>>>>> two shortcomings in the current state of the triggers that do not play
>>>> well
>>>>> with the new proposals, and more specifically the composite triggers
>> All
>>>>> and Any.
>>>>> 
>>>>> So here it goes:
>>>>> 
>>>>> 1) In the document posted above, there are some new useful trigger
>>>> combinations (like Any and All) which allow
>>>>> for combining primitive triggers. This decouples the TriggerResult of
>>>> each individual trigger from the action that
>>>>> is actually going to be executed. For example, an All trigger may have
>>>> one proposing FIRE while the other
>>>>> CONTINUE and the final result will be CONTINUE.
>>>>> 
>>>>> In this case, any action that  should be taken by each individual
>>>> trigger upon firing, e.g. cleaning its state as in the
>>>>> case of CountTrigger, should be postponed until the parent trigger
>> (All)
>>>> decides to fire.
>>>>> 
>>>>> For this, there should be a onFire() method in each trigger that does
>>>> exactly that. This method should be called in the
>>>>> fireOrCleanup() of the windowOperator, when the firing is successful.
>>>>> 
>>>>> 2) In the current implementation, when stateful triggers, like the
>>>> CountTrigger, are combined in a composite Trigger
>>>>> (with Any or All) their state is shared because the stateHandle is the
>>>> same for both. To solve this, the handle should
>>>>> become unique, BUT consistent for the same Trigger. The latter implies
>>>> that the handle for the same trigger after
>>>>> a node failure, should be the same as that of its predecessor (before
>>>> the failure).
>>>>> 
>>>>> Let me know your thoughts on these.
>>>>> 
>>>>> Kostas
>>>>> 
>>>>> 
>>>>>> On Jul 21, 2016, at 10:24 AM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>>> 
>>>>>> I'm proposing to get this small change into 1.1:
>>>>>> https://issues.apache.org/jira/browse/FLINK-4239 This will make our
>>>> lives
>>>>>> easier with the future proposed changes.
>>>>>> 
>>>>>> What do you think?
>>>>>> On Tue, 19 Jul 2016 at 11:41 Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> these new features should make it into the 1.2 release. We are
>> already
>>>>>>> working on releasing 1.1 so it won't make it for that one.
>>>> unfortunately.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>> 
>>>>>>> On Mon, 18 Jul 2016 at 23:19 Chen Qin <qinnc...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> BTW, do you have rough timeline in term of roll out it to
>> production?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Chen
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek <
>>>> aljos...@apache.org>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> Chen commented this on the doc (I'm mirroring here so everyone can
>>>>>>>> follow):
>>>>>>>>> "It would be cool to be able to access last snapshot of window
>> states
>>>>>>>>> before it get purged. Pipeline author might consider put it to
>>>> external
>>>>>>>>> storage and deal with late arriving events by restore corresponding
>>>>>>>>> window."
>>>>>>>>> 
>>>>>>>>> My answer:
>>>>>>>>> This is partially covered by the section called "What Happens at
>>>>>>>>> Window-Cleanup Time, Who Decides When to Purge". What I want to
>>>>>>>> introduce
>>>>>>>>> is that the window can have one final emission if there is new data
>>>> in
>>>>>>>> the
>>>>>>>>> buffers at cleanup time.
>>>>>>>>> 
>>>>>>>>> The work on this will also depend on this proposal:
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
>>>>>>>>> With
>>>>>>>>> this, the WindowFunction can get meta data about the window firing
>>>> so it
>>>>>>>>> could be informed that this is the last firing before a cleanup and
>>>> that
>>>>>>>>> there already was an earlier, on-time firing.
>>>>>>>>> 
>>>>>>>>> Does this cover your concerns, Chen?
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>> 
>>>>>>>>> On Sun, 10 Jul 2016 at 21:24 Chen Qin <qinnc...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>>> Sure. Currently, it looks like any element assigned to a too late
>>>>>>>> window
>>>>>>>>>> will be dropped silently😓 ?
>>>>>>>>>> 
>>>>>>>>>> Having a late window stream imply somehow Flink needs to add one
>>>> more
>>>>>>>>> state
>>>>>>>>>> to window and split window state cleanup from window retirement.
>>>>>>>>>> I would suggest as simple as adding a function in trigger called
>>>>>>>>>> OnLateElement and always fire_purge it would enable developer
>> aware
>>>>>>>> and
>>>>>>>>>> handle this case.
>>>>>>>>>> 
>>>>>>>>>> Chen
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <
>>>> aljos...@apache.org
>>>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> @Chen I added a section at the end of the document regarding
>> access
>>>>>>>> to
>>>>>>>>>> the
>>>>>>>>>>> elements that are dropped as late. Right now, the section just
>>>>>>>> mentions
>>>>>>>>>>> that we have to do this but there is no real proposal yet for how
>>>>>>>> to do
>>>>>>>>>> it.
>>>>>>>>>>> Only a rough sketch so that we don't forget about it.
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, 8 Jul 2016 at 07:47 Chen Qin <qinnc...@gmail.com> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> +1 for allowedLateness scenario.
>>>>>>>>>>>> 
>>>>>>>>>>>> The rationale behind is there are backfills or data issues hold
>>>>>>>>>> in-window
>>>>>>>>>>>> data till watermark pass end time. It cause sink write partial
>>>>>>>>> output.
>>>>>>>>>>>> 
>>>>>>>>>>>> Allow high allowedLateness threshold makes life easier to merge
>>>>>>>> those
>>>>>>>>>>>> results and overwrite partial output with correct output at
>> sink.
>>>>>>>> But
>>>>>>>>>>> yeah,
>>>>>>>>>>>> pipeline author is at risk of blow up statebackend with huge
>>>>>>>> states.
>>>>>>>>>>>> 
>>>>>>>>>>>> Alternatively, In some case, if sink allows read-check-merge
>>>>>>>>> operation,
>>>>>>>>>>>> window can explicit call out events ingested after
>>>>>>>> allowedLateness.
>>>>>>>>> It
>>>>>>>>>>> asks
>>>>>>>>>>>> pipeline author mitigated these events in a way outside of flink
>>>>>>>>>>> ecosystem.
>>>>>>>>>>>> The catch is that since everywhere in a flink job can replay and
>>>>>>>>>>>> checkpoint, notification should somehow includes these info as
>>>>>>>> well.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Chen
>>>>>>>>>>>> 
>>>>>>>>>>>> On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
>>>>>>>>>>>> k.klou...@data-artisans.com
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> In the effort to move the discussion to the mailing list,
>> rather
>>>>>>>>> than
>>>>>>>>>>> the
>>>>>>>>>>>>> doc,
>>>>>>>>>>>>> there was a comment in the doc:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> “It seems this proposal marries the allowed lateness of events
>>>>>>>> and
>>>>>>>>>> the
>>>>>>>>>>>>> discarding of window state. In most use cases this should be
>>>>>>>>>>> sufficient,
>>>>>>>>>>>>> but there are instances where having independent control of
>>>>>>>> these
>>>>>>>>> may
>>>>>>>>>>> be
>>>>>>>>>>>>> useful.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> For instance, you may have a job that computes some aggregate,
>>>>>>>>> like a
>>>>>>>>>>>> sum.
>>>>>>>>>>>>> You may want to keep the window state around for a while, but
>>>>>>>> not
>>>>>>>>> too
>>>>>>>>>>>> long.
>>>>>>>>>>>>> Yet you may want to continue processing late events after you
>>>>>>>>>> discarded
>>>>>>>>>>>> the
>>>>>>>>>>>>> window state. It is possible that your stream sinks can make
>>>>>>>> use of
>>>>>>>>>>> this
>>>>>>>>>>>>> data. For instance, they may be writing to a data store that
>>>>>>>>> returns
>>>>>>>>>> an
>>>>>>>>>>>>> error if a row already exists, which allow the sink to read the
>>>>>>>>>>> existing
>>>>>>>>>>>>> row and update it with the new data."
>>>>>>>>>>>>> 
>>>>>>>>>>>>> To which I would like to reply:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> If I understand your use-case correctly, I believe that the
>>>>>>>>> proposed
>>>>>>>>>>>>> binding of the allowed lateness to the state purging does not
>>>>>>>>> impose
>>>>>>>>>>> any
>>>>>>>>>>>>> problem. The lateness specifies the upper time bound, after
>>>>>>>> which
>>>>>>>>> the
>>>>>>>>>>>> state
>>>>>>>>>>>>> will be discarded. Between the start of a window and its (end +
>>>>>>>>>>>>> allowedLateness) you can write custom triggers that fire, purge
>>>>>>>> the
>>>>>>>>>>>> state,
>>>>>>>>>>>>> or do nothing. Given this, I suppose that, at the most extreme
>>>>>>>>> case,
>>>>>>>>>>> you
>>>>>>>>>>>>> can specify an allowed lateness of Long.MaxValue and do the
>>>>>>>> purging
>>>>>>>>>> of
>>>>>>>>>>>> the
>>>>>>>>>>>>> state "manually". By doing this, you remove the safeguard of
>>>>>>>>> letting
>>>>>>>>>>> the
>>>>>>>>>>>>> system purge the state at some point in time, and you can do
>>>>>>>> your
>>>>>>>>> own
>>>>>>>>>>>>> custom state management that fits your needs.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Kostas
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek <
>>>>>>>>> aljos...@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> @Vishnu Funny you should ask that because I have a design doc
>>>>>>>>> lying
>>>>>>>>>>>>> around.
>>>>>>>>>>>>>> I'll open a new mail thread to not hijack this one.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <
>>>>>>>>>>>>> vishnu.viswanat...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I was going through the suggested improvements in window,
>>>>>>>> and I
>>>>>>>>>> have
>>>>>>>>>>>>>>> few questions/suggestion on improvement regarding the
>>>>>>>> Evictor.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 1) I am having a use case where I have to create a custom
>>>>>>>>> Evictor
>>>>>>>>>>> that
>>>>>>>>>>>>> will
>>>>>>>>>>>>>>> evict elements from the window based on the value (e.g., if I
>>>>>>>>> have
>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>> are of case class Item(id: Int, type:String) then evict
>>>>>>>> elements
>>>>>>>>>>> that
>>>>>>>>>>>>> has
>>>>>>>>>>>>>>> type="a"). I believe this is not currently possible.
>>>>>>>>>>>>>>> 2) this is somewhat related to 1) where there should be an
>>>>>>>>> option
>>>>>>>>>> to
>>>>>>>>>>>>> evict
>>>>>>>>>>>>>>> elements from anywhere in the window. not only from the
>>>>>>>>> beginning
>>>>>>>>>> of
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> window. (e.g., apply the delta function to all elements and
>>>>>>>>> remove
>>>>>>>>>>> all
>>>>>>>>>>>>>>> those don't pass. I checked the code and evict method just
>>>>>>>>> returns
>>>>>>>>>>> the
>>>>>>>>>>>>>>> number of elements to be removed and processTriggerResult
>>>>>>>> just
>>>>>>>>>> skips
>>>>>>>>>>>>> those
>>>>>>>>>>>>>>> many elements from the beginning.
>>>>>>>>>>>>>>> 3) Add an option to enables the user to decide if the
>>>>>>>> eviction
>>>>>>>>>>> should
>>>>>>>>>>>>>>> happen before the apply function or after the apply function.
>>>>>>>>>>>> Currently
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> is before the apply function, but I have a use case where I
>>>>>>>> need
>>>>>>>>>> to
>>>>>>>>>>>>> first
>>>>>>>>>>>>>>> apply the function and evict afterward.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I am doing these for a POC so I think I can modify the flink
>>>>>>>>> code
>>>>>>>>>>> base
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> make these changes and build, but I would appreciate any
>>>>>>>>>> suggestion
>>>>>>>>>>> on
>>>>>>>>>>>>>>> whether these are viable changes or will there any
>>>>>>>> performance
>>>>>>>>>> issue
>>>>>>>>>>>> if
>>>>>>>>>>>>>>> these are done. Also any pointer on where to start(e.g, do I
>>>>>>>>>> create
>>>>>>>>>>> a
>>>>>>>>>>>>> new
>>>>>>>>>>>>>>> class similar to EvictingWindowOperator that extends
>>>>>>>>>>> WindowOperator?)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks and Regards,
>>>>>>>>>>>>>>> Vishnu Viswanath,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek <
>>>>>>>>>>> aljos...@apache.org
>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I did:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>> 
>> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e
>>>>>>>>>>>>>>>> ;-)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <u...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <
>>>>>>>>>>>> aljos...@apache.org
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> In the future, it might be good to to discussions
>>>>>>>> directly on
>>>>>>>>>> the
>>>>>>>>>>>> ML
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> then change the document accordingly. This way everyone
>>>>>>>> can
>>>>>>>>>>> follow
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> discussion on the ML. I also feel that Google Doc comments
>>>>>>>>>> often
>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>> give
>>>>>>>>>>>>>>>>>> enough space for expressing more complex opinions.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I agree! Would you mind raising this point as a separate
>>>>>>>>>>> discussion
>>>>>>>>>>>> on
>>>>>>>>>>>>>>>> dev@
>>>>>>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to