Hi Niels,
do the records that arrive from Kafka already have the session ID or do you 
want to assign them inside your Flink job based on the idle timeout?

For the rest of your problems you should be able to get by with what Flink 
provides:

The triggering can be done using a custom Trigger that fires after we haven’t 
seen an element for 30 minutes.
public class TimeoutTrigger implements Trigger<Object, Window> {
   private static final long serialVersionUID = 1L;

   @Override
   public TriggerResult onElement(Object element, long timestamp, Window 
window, TriggerContext ctx) throws Exception {
      // on every element it will set a timer for 30 seconds in the future
      // a trigger can only have 1 timer so we remove the old trigger when 
setting the new one
      ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 30000); // 
this is 30 seconds but you can change it
      return TriggerResult.CONTINUE;
   }

   @Override
   public TriggerResult onEventTime(long time, Window window, TriggerContext 
ctx) {
      return TriggerResult.CONTINUE;
   }

   @Override
   public TriggerResult onProcessingTime(long time, Window window, 
TriggerContext ctx) throws Exception {
      return TriggerResult.FIRE_AND_PURGE;
   }

   @Override
   public String toString() {
      return "TimeoutTrigger()";
   }
}

you would use it like this:
stream.keyBy(…).window(…).trigger(new TimeoutTrigger())

For writing to files you could use the RollingSink 
(https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem).
 I think this does pretty much what you want. You can specify how large the 
files that it writes are, and it can also roll to new files on a specified time 
interval.

Please let us know if you need more information.

Cheers,
Aljoscha
> On 26 Nov 2015, at 22:13, Niels Basjes <ni...@basjes.nl> wrote:
> 
> Hi,
> 
> I'm trying to build something in Flink that relies heavily on the Windowing 
> features.
> 
> In essence what I want to build:
> I have clickstream data coming in via Kafka. Each record (click) has a 
> sessionid and a timestamp.
> I want to create a window for each session and after 30 minutes idle I want 
> all events for that session (visit) to be written to disk.
> This should result in the effect that a specific visit exists in exactly one 
> file.
> Since HDFS does not like 'small files' I want to create a (set of) files 
> every 15 minutes that contains several complete  visits.
> So I need to buffer the 'completed visits' and flush them to disk in 15 
> minute batches.
> 
> What I think I need to get this is:
> 1) A map function that assigns the visit-id (i.e. new id after 30 minutes 
> idle)
> 2) A window per visit-id (close the window 30 minutes after the last click) 
> 3) A window per 15 minutes that only contains windows of visits that are 
> complete 
> 
> Today I've been trying to get this setup and I think I have some parts that 
> are in the right direction.
> 
> I have some questions and I'm hoping you guys can help me:
> 
> 1) I have trouble understanding the way a windowed stream works "exactly". 
> As a consequence I'm having a hard time verifying if my code does what I 
> understand it should do. 
> I guess what would really help me is a very simple example on how to unittest 
> such a window.
> 
> 2) Is what I describe above perhaps already been done before? If so; any 
> pointers are really appreciated.
> 
> 3) Am I working in the right direction for what I'm trying to achieve; or 
> should I use a different API? a different approach?
> 
> Thanks
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes
> 
> 

Reply via email to