Hi Niels, do the records that arrive from Kafka already have the session ID or do you want to assign them inside your Flink job based on the idle timeout?
For the rest of your problems you should be able to get by with what Flink provides: The triggering can be done using a custom Trigger that fires after we haven’t seen an element for 30 minutes. public class TimeoutTrigger implements Trigger<Object, Window> { private static final long serialVersionUID = 1L; @Override public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext ctx) throws Exception { // on every element it will set a timer for 30 seconds in the future // a trigger can only have 1 timer so we remove the old trigger when setting the new one ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 30000); // this is 30 seconds but you can change it return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception { return TriggerResult.FIRE_AND_PURGE; } @Override public String toString() { return "TimeoutTrigger()"; } } you would use it like this: stream.keyBy(…).window(…).trigger(new TimeoutTrigger()) For writing to files you could use the RollingSink (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem). I think this does pretty much what you want. You can specify how large the files that it writes are, and it can also roll to new files on a specified time interval. Please let us know if you need more information. Cheers, Aljoscha > On 26 Nov 2015, at 22:13, Niels Basjes <ni...@basjes.nl> wrote: > > Hi, > > I'm trying to build something in Flink that relies heavily on the Windowing > features. > > In essence what I want to build: > I have clickstream data coming in via Kafka. Each record (click) has a > sessionid and a timestamp. > I want to create a window for each session and after 30 minutes idle I want > all events for that session (visit) to be written to disk. > This should result in the effect that a specific visit exists in exactly one > file. > Since HDFS does not like 'small files' I want to create a (set of) files > every 15 minutes that contains several complete visits. > So I need to buffer the 'completed visits' and flush them to disk in 15 > minute batches. > > What I think I need to get this is: > 1) A map function that assigns the visit-id (i.e. new id after 30 minutes > idle) > 2) A window per visit-id (close the window 30 minutes after the last click) > 3) A window per 15 minutes that only contains windows of visits that are > complete > > Today I've been trying to get this setup and I think I have some parts that > are in the right direction. > > I have some questions and I'm hoping you guys can help me: > > 1) I have trouble understanding the way a windowed stream works "exactly". > As a consequence I'm having a hard time verifying if my code does what I > understand it should do. > I guess what would really help me is a very simple example on how to unittest > such a window. > > 2) Is what I describe above perhaps already been done before? If so; any > pointers are really appreciated. > > 3) Am I working in the right direction for what I'm trying to achieve; or > should I use a different API? a different approach? > > Thanks > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > >