In my specific case I can be more accurate about when the next time is when it would make sense to check. Then for all next events I only record a little bit of state about the event stream (i.e. first and last event time). I then only place a new timer in the timer handler and this way I can limit the number of different timers that will be set dramatically.
Niels On Fri, Nov 27, 2015 at 4:11 PM, Aljoscha Krettek < aljoscha.kret...@gmail.com> wrote: > Hi, > yes, you are right in your analysis. Did you try running it with always > setting the timer? Maybe it’s not the bottleneck of the computation. I > would be very interested in seeing how this behaves since I only did tests > with regular time windows, where the first if statement almost always > directly returns, which is very cheap. > > Cheers, > Aljoscha > > On 27 Nov 2015, at 13:59, Niels Basjes <ni...@basjes.nl> wrote: > > > > Hi, > > > > Thanks for all this input. > > I didn't know about the > > // a trigger can only have 1 timer so we remove the old trigger > when setting the new one > > > > This insight is to me of major importance. > > Let me explain: > > I found in the WindowOperator this code below. > > > > @Override > > public void registerEventTimeTimer(long time) { > > if (watermarkTimer == time) { > > // we already have set a trigger for that time > > return; > > } > > Set<Context> triggers = watermarkTimers.get(time); > > if (triggers == null) { > > triggers = new HashSet<>(); > > watermarkTimers.put(time, triggers); > > } > > this.watermarkTimer = time; > > triggers.add(this); > > } > > > > and > > > > if (time == watermarkTimer) { > > watermarkTimer = -1; > > Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, > window, this); > > > > Effectively the new value is stored; processed yet at the moment the > trigger fires the call is not forwarded into the application. > > So if I would do it as you show in your example I would have the same > number of trigger entries in the watermarkTimers set as I have seen events. > > My application will (in total) handle about 50K events/sec resulting in > to thousands 'onEventTime' calls per second. > > > > So thank you. I now understand I have to be more careful with these > timers!. > > > > Niels Basjes > > > > > > > > On Fri, Nov 27, 2015 at 11:28 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > Hi Niels, > > do the records that arrive from Kafka already have the session ID or do > you want to assign them inside your Flink job based on the idle timeout? > > > > For the rest of your problems you should be able to get by with what > Flink provides: > > > > The triggering can be done using a custom Trigger that fires after we > haven’t seen an element for 30 minutes. > > public class TimeoutTrigger implements Trigger<Object, Window> { > > private static final long serialVersionUID = 1L; > > > > @Override > > public TriggerResult onElement(Object element, long timestamp, Window > window, TriggerContext ctx) throws Exception { > > // on every element it will set a timer for 30 seconds in the > future > > // a trigger can only have 1 timer so we remove the old trigger > when setting the new one > > ctx.registerProcessingTimeTimer(System.currentTimeMillis() + > 30000); // this is 30 seconds but you can change it > > return TriggerResult.CONTINUE; > > } > > > > @Override > > public TriggerResult onEventTime(long time, Window window, > TriggerContext ctx) { > > return TriggerResult.CONTINUE; > > } > > > > @Override > > public TriggerResult onProcessingTime(long time, Window window, > TriggerContext ctx) throws Exception { > > return TriggerResult.FIRE_AND_PURGE; > > } > > > > @Override > > public String toString() { > > return "TimeoutTrigger()"; > > } > > } > > > > you would use it like this: > > stream.keyBy(…).window(…).trigger(new TimeoutTrigger()) > > > > For writing to files you could use the RollingSink ( > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem). > I think this does pretty much what you want. You can specify how large the > files that it writes are, and it can also roll to new files on a specified > time interval. > > > > Please let us know if you need more information. > > > > Cheers, > > Aljoscha > > > On 26 Nov 2015, at 22:13, Niels Basjes <ni...@basjes.nl> wrote: > > > > > > Hi, > > > > > > I'm trying to build something in Flink that relies heavily on the > Windowing features. > > > > > > In essence what I want to build: > > > I have clickstream data coming in via Kafka. Each record (click) has a > sessionid and a timestamp. > > > I want to create a window for each session and after 30 minutes idle I > want all events for that session (visit) to be written to disk. > > > This should result in the effect that a specific visit exists in > exactly one file. > > > Since HDFS does not like 'small files' I want to create a (set of) > files every 15 minutes that contains several complete visits. > > > So I need to buffer the 'completed visits' and flush them to disk in > 15 minute batches. > > > > > > What I think I need to get this is: > > > 1) A map function that assigns the visit-id (i.e. new id after 30 > minutes idle) > > > 2) A window per visit-id (close the window 30 minutes after the last > click) > > > 3) A window per 15 minutes that only contains windows of visits that > are complete > > > > > > Today I've been trying to get this setup and I think I have some parts > that are in the right direction. > > > > > > I have some questions and I'm hoping you guys can help me: > > > > > > 1) I have trouble understanding the way a windowed stream works > "exactly". > > > As a consequence I'm having a hard time verifying if my code does what > I understand it should do. > > > I guess what would really help me is a very simple example on how to > unittest such a window. > > > > > > 2) Is what I describe above perhaps already been done before? If so; > any pointers are really appreciated. > > > > > > 3) Am I working in the right direction for what I'm trying to achieve; > or should I use a different API? a different approach? > > > > > > Thanks > > > > > > -- > > > Best regards / Met vriendelijke groeten, > > > > > > Niels Basjes > > > > > > > > > > > > > > > > -- > > Best regards / Met vriendelijke groeten, > > > > Niels Basjes > > -- Best regards / Met vriendelijke groeten, Niels Basjes