Hi, I’m afraid this will not work well because a WindowAssigner should be stateless, i.e. it should not keep any state in fields. The reason is that there can be several WindowAssigners used on the different partitions and the order in which a WindowAssigner sees the incoming elements is also not guaranteed. That is, you might set a timestamp in the “first_timestamp” field that is not chronologically the “first timestamp”.
The reason for your windows being purged is probably the allowed lateness, which is zero by default. When the watermark passes the end of a window plus the allowed lateness the window contents are being purged. You can configure the allowed lateness via WindowedStream.allowedLateness(). You should be careful, though, because of you set this too high you might never clean up your window state and therefore have ever growing state. Best, Aljoscha > On 18. Jul 2017, at 15:05, jad mad <jadmad0...@gmail.com> wrote: > > Aljoscha, > > what a great answer and this is what I'd expected! > > as a workaround I've modified the EventTimeSlidingWindow a little bit to a > custom WindowAssigner like below : > the a few differences are > 1.storing the first timestamp in a variable "first_timestamp", > 2.used this time stamp as the any following windows' start time. > @PublicEvolving > public class MySlidingEventTimeWindows extends WindowAssigner<Object, > TimeWindow> { > private static final long serialVersionUID = 1L; > private final long size; > private final long slide; > private final long offset; > private long first_timestamp = -1L; // added by me! > > protected MySlidingEventTimeWindows(long size, long slide, long offset) { > if(offset >= 0L && offset < slide && size > 0L) { > this.size = size; > this.slide = slide; > this.offset = offset; > } else { > throw new IllegalArgumentException("SlidingEventTimeWindows > parameters must satisfy 0 <= offset < slide and size > 0"); > } > } > > public Collection<TimeWindow> assignWindows(Object element, long > timestamp, WindowAssignerContext context) { > if(timestamp <= -9223372036854775808L) { > throw new RuntimeException("Record has Long.MIN_VALUE timestamp > (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', > or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?"); > } else { > if(this.first_timestamp == -1L) {// added by me! > this.first_timestamp = timestamp; > System.out.println("===================== " + > this.first_timestamp + " ========================"); > } > List<TimeWindow> windows = new ArrayList((int)(this.size / > this.slide)); > long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, > this.offset, this.slide); > > for(long start = lastStart; start > timestamp - this.size; start > -= this.slide) { > //windows.add(new TimeWindow(start, start + this.size)); // > original implementation > windows.add(new TimeWindow(this.first_timestamp, start + > this.size)); // modified by me! > } > return windows; > } > } > the result I get from MyWindowFunction(...) is like below : > 2017-01-01 00:17:39 2017-01-01 00:00:01 2 > 2017-01-01 00:17:39 2017-01-01 00:00:02 4 > 2017-01-01 00:17:39 2017-01-01 00:00:03 4 > 2017-01-01 00:17:39 2017-01-01 00:00:04 10 > 2017-01-01 00:17:39 2017-01-01 00:00:05 19 > 2017-01-01 00:17:39 2017-01-01 00:00:06 19 > 2017-01-01 00:17:39 2017-01-01 00:00:07 20 > 2017-01-01 00:17:39 2017-01-01 00:00:08 23 > 2017-01-01 00:17:39 2017-01-01 00:00:09 21 > 2017-01-01 00:17:39 2017-01-01 00:00:10 7 > 2017-01-01 00:17:39 2017-01-01 00:00:11 2 > 2017-01-01 00:17:39 2017-01-01 00:00:12 5 > 2017-01-01 00:17:39 2017-01-01 00:00:13 12 > 2017-01-01 00:17:39 2017-01-01 00:00:14 17 > 2017-01-01 00:17:39 2017-01-01 00:00:15 9 > 2017-01-01 00:17:39 2017-01-01 00:00:16 8 > > things I don't seem to understand are > 1. when my inputs' first line time stamp is 2017-01-01 00:00:00 why is > 2017-01-01 00:17:39 shown up in my result as > each sliding window's start time? > basically, I'm just printing out the time stamp came with the first > iterable object's element in MyWindowFunction. > 2. I made MyWindowAssigner in a hope that the starting time is fixed and the > contents not being purged. > however, from the results, we can see it works just as a normal > EventTimeSlidingWindow with contents > been purged. > How can I make it not to throw away its window contents even after each > time firing. > 3. this MyWindowAssigner(...) attempt arose as an effort based on your > previous advice using a > different WindowFunction. wonder if I'm heading to the right direction or > not. > > thank you very much! > jad > > On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Ah, I see. The problem is that the watermark has slightly tricky semantics: A > watermark T says that there will not be elements with a timestamp <= T in the > future. It does not say, that there have not yet been elements with a > timestamp > T. In your specific case, this means that there will be elements > in the GlobalWindow that have a timestamp that is after the firing timestamp > of your trigger. If you want to make sure that windows are somehow put into > buckets, based on their timestamp then you need to use a different > WindowFunction, because GlobalWindows simply puts every element into the same > bucket (window). > > Regarding the firing timestamp, it’s currently not possible the get that from > within a WindowFunction. > > Best, > Aljoscha > > >> On 16. Jul 2017, at 12:16, jad mad <jadmad0...@gmail.com >> <mailto:jadmad0...@gmail.com>> wrote: >> >> Hello Aljoscha, >> >> thank you very much for your reply. the issue with me is two-fold. >> first of all, >> the thing I wanted to achieve was having a GlobalWindows and let it fire >> periodically, say 1 hour or 1 day, and then do some custom calculation. >> this custom trigger part I've implemented seems working well. >> >> currently, when every time my custom trigger fires periodically, the >> elements of iterable object >> passed onto my custom WindowFunction contains whole inputs from the start to >> the end rather than >> from start to the timing(event time timestamp) where each time trigger fires. >> have been worked on this for a week now but not being able to find any >> solution yet. >> >> input example. >> 2017-07-16 00:00:01, x >> 2017-07-16 00:00:12, x >> 2017-07-16 01:03:06, x >> 2017-07-16 02:20:10, x >> >> In this case, a GlobalWindows with 1-hour periodical trigger, designed to >> count the cumulative record in MyWindowFunction should emit something like >> 2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2 >> 2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3 >> 2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4 >> ↑ the start time stamp doesn't change! >> >> now, what I get is like >> 2017-07-16 00:00:00 ~ , 4 >> 2017-07-16 00:00:00 ~ , 4 >> 2017-07-16 00:00:00 ~ , 4 >> ↑every line the same results... >> >> public class MyWindowFunction<T, W extends Window> implements >> WindowFunction<Tuple2<String, String>, Tuple3<String, String, String>, >> Tuple, W> { >> >> @Override >> public void apply(Tuple tuple, W window, Iterable<Tuple2<String, >> String>> iterable, Collector<Tuple3<String, String, String>> out) throws >> Exception { >> >> for(Tuple2<String, String> element : iterable) >> { >> ... >> } >> out.collect(new Tuple3<String, String, String>("...", "...", >> "...")); >> } >> } >> Secondly, for a GlobalWindows firing periodically, how do you get the >> periodical firing time stamp inside of >> your MyWindowFunction? (the missing ~ part of ending time stamp in above >> example) >> >> really appreciate the help! >> jad >> >> >> On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <aljos...@apache.org >> <mailto:aljos...@apache.org>> wrote: >> Hi, >> >> Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to >> always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I >> thought your problem was that data is never cleared away when using >> GlobalWindows. Is that not the case? >> >> Best, >> Aljoscha >> >>> On 14. Jul 2017, at 16:29, jad mad <jadmad0...@gmail.com >>> <mailto:jadmad0...@gmail.com>> wrote: >>> >>> Hi Aljoscha >>> >>> thanks for the comment. >>> is wrapping by a PurgingTrigger.of() the same as doing "return >>> TriggerResult.FIRE_AND_PURGE;" >>> inside of a custom trigger? >>> >>> gave it a test and the result seems the opposite of what I meant... >>> instead of throwing away previous windows' contents, I wanna keep them >>> all the way till the end. >>> that way I can get the cumulative counts of all input. >>> >>> wonder how to achieve it. >>> anyone? >>> >>> jad >>> >>> >>> On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <aljos...@apache.org >>> <mailto:aljos...@apache.org>> wrote: >>> Window contents are only purged from state if the Trigger says so or if the >>> watermark passes the garbage collection horizon for a given window. With >>> GlobalWindows, the GC horizon is never reached, that leaves Triggers. >>> >>> You can create a Trigger that purges every time it fires by wrapping it in >>> a PurgingTrigger, i.e. >>> >>> .window(PurgingTrigger.of(<my trigger>)) >>> >>> Best, >>> Aljoscha >>> >>>> On 13. Jul 2017, at 14:00, jad mad <jadmad0...@gmail.com >>>> <mailto:jadmad0...@gmail.com>> wrote: >>>> >>>> Hi Prashant, >>>> >>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>> >>>> actually I could make my custom trigger to fire periodically. >>>> The problem is the element set stored in the iterable variable >>>> is always uniform which is not what I'm expecting... >>>> >>>> private static class MyWindowFunction_Window... >>>> ... >>>> @Override >>>> public void apply(Tuple tuple, W window, Iterable<MyClass> >>>> iterable, >>>> ... >>>> for(MyClass element : iterable) >>>> >>>> does anyone have any idea on this? >>>> thanks a lot in advance, >>>> jad >>>> >>>> >>>> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak >>>> <prash...@intellifylearning.com <mailto:prash...@intellifylearning.com>> >>>> wrote: >>>> Hi >>>> >>>> We've have custom operators using global windows and are using event time. >>>> >>>> How are you specifying event time as the time characteristic? >>>> >>>> Prashant >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html >>>> >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html> >>>> Sent from the Apache Flink User Mailing List archive. mailing list archive >>>> at Nabble.com <http://nabble.com/>. >>>> >>> >>> >> >> > > > > On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Ah, I see. The problem is that the watermark has slightly tricky semantics: A > watermark T says that there will not be elements with a timestamp <= T in the > future. It does not say, that there have not yet been elements with a > timestamp > T. In your specific case, this means that there will be elements > in the GlobalWindow that have a timestamp that is after the firing timestamp > of your trigger. If you want to make sure that windows are somehow put into > buckets, based on their timestamp then you need to use a different > WindowFunction, because GlobalWindows simply puts every element into the same > bucket (window). > > Regarding the firing timestamp, it’s currently not possible the get that from > within a WindowFunction. > > Best, > Aljoscha > > >> On 16. Jul 2017, at 12:16, jad mad <jadmad0...@gmail.com >> <mailto:jadmad0...@gmail.com>> wrote: >> >> Hello Aljoscha, >> >> thank you very much for your reply. the issue with me is two-fold. >> first of all, >> the thing I wanted to achieve was having a GlobalWindows and let it fire >> periodically, say 1 hour or 1 day, and then do some custom calculation. >> this custom trigger part I've implemented seems working well. >> >> currently, when every time my custom trigger fires periodically, the >> elements of iterable object >> passed onto my custom WindowFunction contains whole inputs from the start to >> the end rather than >> from start to the timing(event time timestamp) where each time trigger fires. >> have been worked on this for a week now but not being able to find any >> solution yet. >> >> input example. >> 2017-07-16 00:00:01, x >> 2017-07-16 00:00:12, x >> 2017-07-16 01:03:06, x >> 2017-07-16 02:20:10, x >> >> In this case, a GlobalWindows with 1-hour periodical trigger, designed to >> count the cumulative record in MyWindowFunction should emit something like >> 2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2 >> 2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3 >> 2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4 >> ↑ the start time stamp doesn't change! >> >> now, what I get is like >> 2017-07-16 00:00:00 ~ , 4 >> 2017-07-16 00:00:00 ~ , 4 >> 2017-07-16 00:00:00 ~ , 4 >> ↑every line the same results... >> >> public class MyWindowFunction<T, W extends Window> implements >> WindowFunction<Tuple2<String, String>, Tuple3<String, String, String>, >> Tuple, W> { >> >> @Override >> public void apply(Tuple tuple, W window, Iterable<Tuple2<String, >> String>> iterable, Collector<Tuple3<String, String, String>> out) throws >> Exception { >> >> for(Tuple2<String, String> element : iterable) >> { >> ... >> } >> out.collect(new Tuple3<String, String, String>("...", "...", >> "...")); >> } >> } >> Secondly, for a GlobalWindows firing periodically, how do you get the >> periodical firing time stamp inside of >> your MyWindowFunction? (the missing ~ part of ending time stamp in above >> example) >> >> really appreciate the help! >> jad >> >> >> On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <aljos...@apache.org >> <mailto:aljos...@apache.org>> wrote: >> Hi, >> >> Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to >> always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I >> thought your problem was that data is never cleared away when using >> GlobalWindows. Is that not the case? >> >> Best, >> Aljoscha >> >>> On 14. Jul 2017, at 16:29, jad mad <jadmad0...@gmail.com >>> <mailto:jadmad0...@gmail.com>> wrote: >>> >>> Hi Aljoscha >>> >>> thanks for the comment. >>> is wrapping by a PurgingTrigger.of() the same as doing "return >>> TriggerResult.FIRE_AND_PURGE;" >>> inside of a custom trigger? >>> >>> gave it a test and the result seems the opposite of what I meant... >>> instead of throwing away previous windows' contents, I wanna keep them >>> all the way till the end. >>> that way I can get the cumulative counts of all input. >>> >>> wonder how to achieve it. >>> anyone? >>> >>> jad >>> >>> >>> On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <aljos...@apache.org >>> <mailto:aljos...@apache.org>> wrote: >>> Window contents are only purged from state if the Trigger says so or if the >>> watermark passes the garbage collection horizon for a given window. With >>> GlobalWindows, the GC horizon is never reached, that leaves Triggers. >>> >>> You can create a Trigger that purges every time it fires by wrapping it in >>> a PurgingTrigger, i.e. >>> >>> .window(PurgingTrigger.of(<my trigger>)) >>> >>> Best, >>> Aljoscha >>> >>>> On 13. Jul 2017, at 14:00, jad mad <jadmad0...@gmail.com >>>> <mailto:jadmad0...@gmail.com>> wrote: >>>> >>>> Hi Prashant, >>>> >>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>> >>>> actually I could make my custom trigger to fire periodically. >>>> The problem is the element set stored in the iterable variable >>>> is always uniform which is not what I'm expecting... >>>> >>>> private static class MyWindowFunction_Window... >>>> ... >>>> @Override >>>> public void apply(Tuple tuple, W window, Iterable<MyClass> >>>> iterable, >>>> ... >>>> for(MyClass element : iterable) >>>> >>>> does anyone have any idea on this? >>>> thanks a lot in advance, >>>> jad >>>> >>>> >>>> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak >>>> <prash...@intellifylearning.com <mailto:prash...@intellifylearning.com>> >>>> wrote: >>>> Hi >>>> >>>> We've have custom operators using global windows and are using event time. >>>> >>>> How are you specifying event time as the time characteristic? >>>> >>>> Prashant >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html >>>> >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html> >>>> Sent from the Apache Flink User Mailing List archive. mailing list archive >>>> at Nabble.com <http://nabble.com/>. >>>> >>> >>> >> >> > >