Hi,

Yes, you can have state in a WindowFunction if you use Flink’s state 
abstraction that you can access from a RichWindowFunction using the 
RuntimeContext. (Or by using a ProcessWindowFunction).

Trigger purging behaviour makes a difference if the Trigger fires repeatedly 
before the watermark reaches the end of the window. For example a trigger that 
speculatively fires early. In those cases it can make sense to make a 
distinction between purging and firing and just firing, depending on whether 
you want all accumulated window contents or only those elements that have 
accumulated since the last trigger firing.

GlobalWindows is not implemented by setting allowed lateness very high, it is a 
WindowAssigner that assigns Long.MAX_VALUE to the max window timestamp, the 
watermark will therefore never pass the end of that GlobalWindow.

Regarding your use case: since you want to keep all data since the start I 
would suggest to use GlobalWindows, a custom Trigger that periodically fires 
and a ProcessWindwoFunction. In the ProcessWindowFunction you can make sure to 
only process those elements that you want to process based on their timestamp 
and the current event time, which you can access from a ProcessWindowFunction.

If you don’t want to keep all events indefinitely (which could eventually blow 
up your state size) you can use an Evictor to sometimes evict certain events 
from the window buffers.

Best,
Aljoscha

> On 20. Jul 2017, at 12:24, jad mad <jadmad0...@gmail.com> wrote:
> 
> Hello Aljoscha,
> 
> > I’m afraid this will not work well because a WindowAssigner should be 
> > stateless
> ok, now understand this.
> How about inside a custom WindowFunction(...), a bad idea to have states as 
> well?
> 
> the default trigger for EventTimeTumblingWindow is the EventTimeTrigger(...).
> looking at the definition file, there are a few return TriggerResult.FIRE; 
> but not a single PURGE.
> even so, each time the contents get cleared the time passes a window end.
> Is this what you meant by 
> >When the watermark passes the end of a window plus the allowed lateness the 
> >window contents are being purged. 
> ?
> if yes, return TriggerResult.FIRE; or return TriggerResult.FIRE_AND_PURGE
> seems less important for a trigger implementation because the contents will 
> be cleared any way
> and the lateness amount is more important?
> And is this how a GlobalWindows implemented by setting the "lateness" to a 
> huge number 
> so that it keeps all things in it?
> 
> so, back to my original question.
> in order to keep everything from start like a GlobalWindows, let it fire 
> periodically and 
> then perform some calcs, what combination of window assigner, trigger, and/or 
> custom window function
> I may use?  better if there'd be a simple working sample.
> 
> thank you a lot!
> jad
> 
> On Thu, Jul 20, 2017 at 5:45 PM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Hi,
> 
> I’m afraid this will not work well because a WindowAssigner should be 
> stateless, i.e. it should not keep any state in fields. The reason is that 
> there can be several WindowAssigners used on the different partitions and the 
> order in which a WindowAssigner sees the incoming elements is also not 
> guaranteed. That is, you might set a timestamp in the “first_timestamp” field 
> that is not chronologically the “first timestamp”.
> 
> The reason for your windows being purged is probably the allowed lateness, 
> which is zero by default. When the watermark passes the end of a window plus 
> the allowed lateness the window contents are being purged. You can configure 
> the allowed lateness via WindowedStream.allowedLateness(). You should be 
> careful, though, because of you set this too high you might never clean up 
> your window state and therefore have ever growing state.
> 
> Best,
> Aljoscha
> 
>> On 18. Jul 2017, at 15:05, jad mad <jadmad0...@gmail.com 
>> <mailto:jadmad0...@gmail.com>> wrote:
>> 
>> Aljoscha,
>> 
>> what a great answer and this is what I'd expected!
>> 
>> as a workaround I've modified the EventTimeSlidingWindow a little bit to a 
>> custom WindowAssigner like below : 
>> the a few differences are 
>> 1.storing the first timestamp in a variable "first_timestamp", 
>> 2.used this time stamp as the any following windows' start time.
>> @PublicEvolving
>> public class MySlidingEventTimeWindows extends WindowAssigner<Object, 
>> TimeWindow> {
>>     private static final long serialVersionUID = 1L;
>>     private final long size;
>>     private final long slide;
>>     private final long offset;
>>     private long first_timestamp = -1L; // added by me!
>> 
>>     protected MySlidingEventTimeWindows(long size, long slide, long offset) {
>>         if(offset >= 0L && offset < slide && size > 0L) {
>>             this.size = size;
>>             this.slide = slide;
>>             this.offset = offset;
>>         } else {
>>             throw new IllegalArgumentException("SlidingEventTimeWindows 
>> parameters must satisfy 0 <= offset < slide and size > 0");
>>         }
>>     }
>> 
>>     public Collection<TimeWindow> assignWindows(Object element, long 
>> timestamp, WindowAssignerContext context) {
>>         if(timestamp <= -9223372036854775808L) {
>>             throw new RuntimeException("Record has Long.MIN_VALUE timestamp 
>> (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', 
>> or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
>>         } else {
>>             if(this.first_timestamp == -1L) {// added by me!
>>                 this.first_timestamp = timestamp;
>>                 System.out.println("===================== " + 
>> this.first_timestamp + " ========================");
>>             }
>>             List<TimeWindow> windows = new ArrayList((int)(this.size / 
>> this.slide));
>>             long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, 
>> this.offset, this.slide);
>> 
>>             for(long start = lastStart; start > timestamp - this.size; start 
>> -= this.slide) {
>>                 //windows.add(new TimeWindow(start, start + this.size)); // 
>> original implementation
>>                 windows.add(new TimeWindow(this.first_timestamp, start + 
>> this.size)); // modified by me!
>>             }
>>             return windows;
>>         }
>>     }
>> the result I get from MyWindowFunction(...) is like below : 
>> 2017-01-01 00:17:39  2017-01-01 00:00:01     2
>> 2017-01-01 00:17:39  2017-01-01 00:00:02     4
>> 2017-01-01 00:17:39  2017-01-01 00:00:03     4
>> 2017-01-01 00:17:39  2017-01-01 00:00:04     10
>> 2017-01-01 00:17:39  2017-01-01 00:00:05     19
>> 2017-01-01 00:17:39  2017-01-01 00:00:06     19
>> 2017-01-01 00:17:39  2017-01-01 00:00:07     20
>> 2017-01-01 00:17:39  2017-01-01 00:00:08     23
>> 2017-01-01 00:17:39  2017-01-01 00:00:09     21
>> 2017-01-01 00:17:39  2017-01-01 00:00:10     7
>> 2017-01-01 00:17:39  2017-01-01 00:00:11     2
>> 2017-01-01 00:17:39  2017-01-01 00:00:12     5
>> 2017-01-01 00:17:39  2017-01-01 00:00:13     12
>> 2017-01-01 00:17:39  2017-01-01 00:00:14     17
>> 2017-01-01 00:17:39  2017-01-01 00:00:15     9
>> 2017-01-01 00:17:39  2017-01-01 00:00:16     8
>> 
>> things I don't seem to understand are 
>> 1. when my inputs' first line time stamp is 2017-01-01 00:00:00 why is 
>> 2017-01-01 00:17:39 shown up in my result as 
>>      each sliding window's start time?
>>     basically, I'm just printing out the time stamp came with the first 
>> iterable object's element in MyWindowFunction.
>> 2. I made MyWindowAssigner in a hope that the starting time is fixed and the 
>> contents not being purged.
>>     however, from the results, we can see it works just as a normal 
>> EventTimeSlidingWindow with contents
>>     been purged.
>>     How can I make it not to throw away its window contents even after each 
>> time firing.
>> 3. this MyWindowAssigner(...) attempt arose as an effort based on your 
>> previous advice using a 
>>     different WindowFunction. wonder if I'm heading to the right direction 
>> or not.
>>     
>> thank you very much!
>> jad
>> 
>> On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> Ah, I see. The problem is that the watermark has slightly tricky semantics: 
>> A watermark T says that there will not be elements with a timestamp <= T in 
>> the future. It does not say, that there have not yet been elements with a 
>> timestamp > T. In your specific case, this means that there will be elements 
>> in the GlobalWindow that have a timestamp that is after the firing timestamp 
>> of your trigger. If you want to make sure that windows are somehow put into 
>> buckets, based on their timestamp then you need to use a different 
>> WindowFunction, because GlobalWindows simply puts every element into the 
>> same bucket (window).
>> 
>> Regarding the firing timestamp, it’s currently not possible the get that 
>> from within a WindowFunction.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 16. Jul 2017, at 12:16, jad mad <jadmad0...@gmail.com 
>>> <mailto:jadmad0...@gmail.com>> wrote:
>>> 
>>> Hello Aljoscha,
>>> 
>>> thank you very much for your reply. the issue with me is two-fold.
>>> first of all, 
>>> the thing I wanted to achieve was having a GlobalWindows and let it fire 
>>> periodically, say 1 hour or 1 day, and then do some custom calculation.
>>> this custom trigger part I've implemented seems working well.
>>> 
>>> currently, when every time my custom trigger fires periodically, the 
>>> elements of iterable object
>>> passed onto my custom WindowFunction contains whole inputs from the start 
>>> to the end rather than
>>> from start to the timing(event time timestamp) where each time trigger 
>>> fires.
>>> have been worked on this for a week now but not being able to find any 
>>> solution yet.
>>> 
>>> input example. 
>>> 2017-07-16 00:00:01, x
>>> 2017-07-16 00:00:12, x
>>> 2017-07-16 01:03:06, x
>>> 2017-07-16 02:20:10, x
>>> 
>>> In this case, a GlobalWindows with 1-hour periodical trigger, designed to 
>>> count the cumulative record in MyWindowFunction should emit something like
>>> 2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2
>>> 2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3
>>> 2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4
>>> ↑ the start time stamp doesn't change!
>>> 
>>> now, what I get is like
>>> 2017-07-16 00:00:00 ~ , 4
>>> 2017-07-16 00:00:00 ~ , 4
>>> 2017-07-16 00:00:00 ~ , 4
>>> ↑every line the same results...
>>> 
>>> public class MyWindowFunction<T, W extends Window>  implements 
>>> WindowFunction<Tuple2<String, String>, Tuple3<String, String, String>, 
>>> Tuple, W> {
>>> 
>>>     @Override
>>>     public void apply(Tuple tuple, W window, Iterable<Tuple2<String, 
>>> String>> iterable, Collector<Tuple3<String, String, String>> out) throws 
>>> Exception {
>>> 
>>>         for(Tuple2<String, String> element : iterable)
>>>         {
>>>             ...
>>>         }
>>>         out.collect(new Tuple3<String, String,  String>("...", "...", 
>>> "..."));
>>>     }
>>> }
>>> Secondly, for a GlobalWindows firing periodically, how do you get the 
>>> periodical firing time stamp inside of
>>> your MyWindowFunction? (the missing ~ part of ending time stamp in above 
>>> example)
>>> 
>>> really appreciate the help!
>>> jad
>>> 
>>> 
>>> On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <aljos...@apache.org 
>>> <mailto:aljos...@apache.org>> wrote:
>>> Hi,
>>> 
>>> Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to 
>>> always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I 
>>> thought your problem was that data is never cleared away when using 
>>> GlobalWindows. Is that not the case?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 14. Jul 2017, at 16:29, jad mad <jadmad0...@gmail.com 
>>>> <mailto:jadmad0...@gmail.com>> wrote:
>>>> 
>>>> Hi Aljoscha
>>>> 
>>>> thanks for the comment. 
>>>> is wrapping by a PurgingTrigger.of() the same as doing "return 
>>>> TriggerResult.FIRE_AND_PURGE;" 
>>>> inside of a custom trigger?
>>>> 
>>>> gave it a test and the result seems the opposite of what I meant...
>>>> instead of throwing away previous windows' contents, I wanna keep them
>>>> all the way till the end. 
>>>> that way I can get the cumulative counts of all input.
>>>> 
>>>> wonder how to achieve it.
>>>> anyone?
>>>> 
>>>> jad
>>>> 
>>>> 
>>>> On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <aljos...@apache.org 
>>>> <mailto:aljos...@apache.org>> wrote:
>>>> Window contents are only purged from state if the Trigger says so or if 
>>>> the watermark passes the garbage collection horizon for a given window. 
>>>> With GlobalWindows, the GC horizon is never reached, that leaves Triggers.
>>>> 
>>>> You can create a Trigger that purges every time it fires by wrapping it in 
>>>> a PurgingTrigger, i.e.
>>>> 
>>>> .window(PurgingTrigger.of(<my trigger>))
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>>> On 13. Jul 2017, at 14:00, jad mad <jadmad0...@gmail.com 
>>>>> <mailto:jadmad0...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Prashant,
>>>>> 
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>> 
>>>>> actually I could make my custom trigger to fire periodically.
>>>>> The problem is the element set stored in the iterable variable 
>>>>> is always uniform which is not what I'm expecting...
>>>>> 
>>>>> private static class MyWindowFunction_Window...
>>>>>          ...    
>>>>>        @Override
>>>>>         public void apply(Tuple tuple, W window, Iterable<MyClass> 
>>>>> iterable,
>>>>>              ...
>>>>>              for(MyClass element : iterable)
>>>>> 
>>>>> does anyone have any idea on this?
>>>>> thanks a lot in advance,
>>>>> jad
>>>>> 
>>>>> 
>>>>> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak 
>>>>> <prash...@intellifylearning.com <mailto:prash...@intellifylearning.com>> 
>>>>> wrote:
>>>>> Hi
>>>>> 
>>>>> We've have custom operators using global windows and are using event time.
>>>>> 
>>>>> How are you specifying event time as the time characteristic?
>>>>> 
>>>>> Prashant
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> View this message in context: 
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
>>>>>  
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list 
>>>>> archive at Nabble.com <http://nabble.com/>.
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
>> 
>> On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> Ah, I see. The problem is that the watermark has slightly tricky semantics: 
>> A watermark T says that there will not be elements with a timestamp <= T in 
>> the future. It does not say, that there have not yet been elements with a 
>> timestamp > T. In your specific case, this means that there will be elements 
>> in the GlobalWindow that have a timestamp that is after the firing timestamp 
>> of your trigger. If you want to make sure that windows are somehow put into 
>> buckets, based on their timestamp then you need to use a different 
>> WindowFunction, because GlobalWindows simply puts every element into the 
>> same bucket (window).
>> 
>> Regarding the firing timestamp, it’s currently not possible the get that 
>> from within a WindowFunction.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 16. Jul 2017, at 12:16, jad mad <jadmad0...@gmail.com 
>>> <mailto:jadmad0...@gmail.com>> wrote:
>>> 
>>> Hello Aljoscha,
>>> 
>>> thank you very much for your reply. the issue with me is two-fold.
>>> first of all, 
>>> the thing I wanted to achieve was having a GlobalWindows and let it fire 
>>> periodically, say 1 hour or 1 day, and then do some custom calculation.
>>> this custom trigger part I've implemented seems working well.
>>> 
>>> currently, when every time my custom trigger fires periodically, the 
>>> elements of iterable object
>>> passed onto my custom WindowFunction contains whole inputs from the start 
>>> to the end rather than
>>> from start to the timing(event time timestamp) where each time trigger 
>>> fires.
>>> have been worked on this for a week now but not being able to find any 
>>> solution yet.
>>> 
>>> input example. 
>>> 2017-07-16 00:00:01, x
>>> 2017-07-16 00:00:12, x
>>> 2017-07-16 01:03:06, x
>>> 2017-07-16 02:20:10, x
>>> 
>>> In this case, a GlobalWindows with 1-hour periodical trigger, designed to 
>>> count the cumulative record in MyWindowFunction should emit something like
>>> 2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2
>>> 2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3
>>> 2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4
>>> ↑ the start time stamp doesn't change!
>>> 
>>> now, what I get is like
>>> 2017-07-16 00:00:00 ~ , 4
>>> 2017-07-16 00:00:00 ~ , 4
>>> 2017-07-16 00:00:00 ~ , 4
>>> ↑every line the same results...
>>> 
>>> public class MyWindowFunction<T, W extends Window>  implements 
>>> WindowFunction<Tuple2<String, String>, Tuple3<String, String, String>, 
>>> Tuple, W> {
>>> 
>>>     @Override
>>>     public void apply(Tuple tuple, W window, Iterable<Tuple2<String, 
>>> String>> iterable, Collector<Tuple3<String, String, String>> out) throws 
>>> Exception {
>>> 
>>>         for(Tuple2<String, String> element : iterable)
>>>         {
>>>             ...
>>>         }
>>>         out.collect(new Tuple3<String, String,  String>("...", "...", 
>>> "..."));
>>>     }
>>> }
>>> Secondly, for a GlobalWindows firing periodically, how do you get the 
>>> periodical firing time stamp inside of
>>> your MyWindowFunction? (the missing ~ part of ending time stamp in above 
>>> example)
>>> 
>>> really appreciate the help!
>>> jad
>>> 
>>> 
>>> On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <aljos...@apache.org 
>>> <mailto:aljos...@apache.org>> wrote:
>>> Hi,
>>> 
>>> Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to 
>>> always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I 
>>> thought your problem was that data is never cleared away when using 
>>> GlobalWindows. Is that not the case?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 14. Jul 2017, at 16:29, jad mad <jadmad0...@gmail.com 
>>>> <mailto:jadmad0...@gmail.com>> wrote:
>>>> 
>>>> Hi Aljoscha
>>>> 
>>>> thanks for the comment. 
>>>> is wrapping by a PurgingTrigger.of() the same as doing "return 
>>>> TriggerResult.FIRE_AND_PURGE;" 
>>>> inside of a custom trigger?
>>>> 
>>>> gave it a test and the result seems the opposite of what I meant...
>>>> instead of throwing away previous windows' contents, I wanna keep them
>>>> all the way till the end. 
>>>> that way I can get the cumulative counts of all input.
>>>> 
>>>> wonder how to achieve it.
>>>> anyone?
>>>> 
>>>> jad
>>>> 
>>>> 
>>>> On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <aljos...@apache.org 
>>>> <mailto:aljos...@apache.org>> wrote:
>>>> Window contents are only purged from state if the Trigger says so or if 
>>>> the watermark passes the garbage collection horizon for a given window. 
>>>> With GlobalWindows, the GC horizon is never reached, that leaves Triggers.
>>>> 
>>>> You can create a Trigger that purges every time it fires by wrapping it in 
>>>> a PurgingTrigger, i.e.
>>>> 
>>>> .window(PurgingTrigger.of(<my trigger>))
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>>> On 13. Jul 2017, at 14:00, jad mad <jadmad0...@gmail.com 
>>>>> <mailto:jadmad0...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Prashant,
>>>>> 
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>> 
>>>>> actually I could make my custom trigger to fire periodically.
>>>>> The problem is the element set stored in the iterable variable 
>>>>> is always uniform which is not what I'm expecting...
>>>>> 
>>>>> private static class MyWindowFunction_Window...
>>>>>          ...    
>>>>>        @Override
>>>>>         public void apply(Tuple tuple, W window, Iterable<MyClass> 
>>>>> iterable,
>>>>>              ...
>>>>>              for(MyClass element : iterable)
>>>>> 
>>>>> does anyone have any idea on this?
>>>>> thanks a lot in advance,
>>>>> jad
>>>>> 
>>>>> 
>>>>> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak 
>>>>> <prash...@intellifylearning.com <mailto:prash...@intellifylearning.com>> 
>>>>> wrote:
>>>>> Hi
>>>>> 
>>>>> We've have custom operators using global windows and are using event time.
>>>>> 
>>>>> How are you specifying event time as the time characteristic?
>>>>> 
>>>>> Prashant
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> View this message in context: 
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
>>>>>  
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list 
>>>>> archive at Nabble.com <http://nabble.com/>.
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 

Reply via email to