Hi, The first thing I noticed is that the Session object maintains a list of all events in memory. Your events are really small yet in my scenario the predicted number of events per session will be above 1000 and each is expected to be in the 512-1024 bytes range. This worried me yet I decided to give your code a run.
After a while running it in my IDE (not on cluster) I got this: 17:18:46,336 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 269 @ 1448986726336 17:18:46,587 INFO org.apache.flink.runtime.taskmanager.Task - sessionization -> Sink: Unnamed (4/4) switched to FAILED with exception. java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving checkpoint barrier at org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:577) at org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:570) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:201) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:127) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:173) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=5246277 , maxSize=5242880 . Consider using a different state backend, like the File System State backend. at org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:130) at org.apache.flink.runtime.state.memory.MemoryStateBackend.checkpointStateSerializable(MemoryStateBackend.java:108) at com.dataartisans.streaming.sessionization.SessionizingOperator.snapshotOperatorState(SessionizingOperator.java:162) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:440) at org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:574) ... 8 more Niels On Tue, Dec 1, 2015 at 4:41 PM, Niels Basjes <ni...@basjes.nl> wrote: > Thanks! > I'm going to study this code closely! > > Niels > > On Tue, Dec 1, 2015 at 2:50 PM, Stephan Ewen <se...@apache.org> wrote: > >> Hi Niels! >> >> I have a pretty nice example for you here: >> https://github.com/StephanEwen/sessionization >> >> It keeps only one state and has the structure: >> >> >> (source) --> (window sessions) ---> (real time sink) >> | >> +--> (15 minute files) >> >> >> The real time sink gets the event with attached visitId immediately. The >> session operator, as a side effect, writes out the 15 minute files with >> sessions that expired in that time. >> >> >> It is not a lot of code, the two main parts are >> >> - the program and the program skeleton: >> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/EventTimeSessionization.java >> - the sessionizing and file writing operator: >> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/SessionizingOperator.java >> >> >> The example runs fully on event time, where the timestamps are extracted >> from the records. That makes this program very robust (no issue with >> clocks, etc). >> >> Also, here comes the amazing part: The same program should do "replay" >> and real time. The only difference is what input you give it. Since time is >> event time, it can do both. >> >> >> One note: >> - Event Time Watermarks are the mechanism to signal progress in event >> time. It is simple here, because I assume that timestamps are ascending in >> a Kafka partition. If that is not the case, you need to implement a more >> elaborate TimestampExtractor. >> >> >> Hope you can work with this! >> >> Greetings, >> Stephan >> >> >> On Tue, Dec 1, 2015 at 1:00 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> Just for clarification: The real-time results should also contain the >>> visitId, correct? >>> >>> On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen <se...@apache.org> wrote: >>> >>>> Hi Niels! >>>> >>>> If you want to use the built-in windowing, you probably need two window: >>>> - One for ID assignment (that immediately pipes elements through) >>>> - One for accumulating session elements, and then piping them into >>>> files upon session end. >>>> >>>> You may be able to use the rolling file sink (roll by 15 minutes) to >>>> store the files. >>>> That is probably the simplest to implement and will serve the real time >>>> case. >>>> >>>> >>>> +--> (real time sink) >>>> | >>>> (source) --> (window session ids) --+ >>>> | >>>> +--> (window session) --> (rolling >>>> sink) >>>> >>>> >>>> You can put this all into one operator that accumulates the session >>>> elements but still immediately emits the new records (the realtime path), >>>> if you implement your own windowing/buffering in a custom function. >>>> This is also very easy to put onto event time then, which makes it >>>> valueable to process the history (replay). For this second case, still >>>> prototyping some code for the event time case, give me a bit, I'll get back >>>> at you... >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Tue, Dec 1, 2015 at 10:55 AM, Niels Basjes <ni...@basjes.nl> wrote: >>>> >>>>> Hi Stephan, >>>>> >>>>> I created a first version of the Visit ID assignment like this: >>>>> >>>>> First I group by sessionid and I create a Window per visit. >>>>> The custom Trigger for this window does a 'FIRE' after each element >>>>> and sets an EventTimer on the 'next possible moment the visit can expire'. >>>>> To avoid getting 'all events' in the visit after every 'FIRE' I'm >>>>> using CountEvictor.of(1). >>>>> When the visit expires I do a PURGE. So if there are more events >>>>> afterwards for the same sessionId I get a new visit (which is exactly what >>>>> I want). >>>>> >>>>> The last step I do is I want to have a 'normal' DataStream again to >>>>> work with. >>>>> I created this WindowFunction to map the Window stream back to normal >>>>> DataStream >>>>> Essentially I do this: >>>>> >>>>> DataStream<Foo> visitDataStream = visitWindowedStream.apply(new >>>>> WindowToStream<Foo>()) >>>>> >>>>> // This is an identity 'apply' >>>>> private static class WindowToStream<T> implements WindowFunction<T, T, >>>>> String, GlobalWindow> { >>>>> @Override >>>>> public void apply(String s, GlobalWindow window, Iterable<T> >>>>> values, Collector<T> out) throws Exception { >>>>> for (T value: values) { >>>>> out.collect(value); >>>>> } >>>>> } >>>>> } >>>>> >>>>> >>>>> The problem with this is that I first create the visitIds in a Window >>>>> (great). >>>>> Because I really need to have both the Windowed events AND the near >>>>> realtime version I currently break down the Window to get the single >>>>> events >>>>> and after that I have to recreate the same Window again. >>>>> >>>>> I'm looking forward to the implementation direction you are referring >>>>> to. I hope you have a better way of doing this. >>>>> >>>>> Niels Basjes >>>>> >>>>> >>>>> On Mon, Nov 30, 2015 at 9:29 PM, Stephan Ewen <se...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Niels! >>>>>> >>>>>> Nice use case that you have! >>>>>> I think you can solve this super nicely with Flink, such that >>>>>> "replay" and "realtime" are literally the same program - they differ only >>>>>> in whether >>>>>> >>>>>> Event time is, like you said, the key thing for "replay". Event time >>>>>> depends on the progress in the timestamps of the data, so it can progress >>>>>> at different speeds, depending on what the rate of your stream is. >>>>>> With the appropriate data source, it will progress very fast in >>>>>> "replay mode", so that you replay in "fast forward speed", and it >>>>>> progresses at the same speed as processing time when you attach to the >>>>>> end >>>>>> of the Kafka queue. >>>>>> >>>>>> When you define the time intervals in your program to react to event >>>>>> time progress, then you will compute the right sessionization in both >>>>>> replay and real time settings. >>>>>> >>>>>> I am writing a little example code to share. The type of >>>>>> ID-assignment sessions you want to do need an undocumented API right now, >>>>>> so I'll prepare something there for you... >>>>>> >>>>>> Greetings, >>>>>> Stephan >>>>>> >>>>>> >>>>>> >>>>>> On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes <ni...@basjes.nl> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> The sessionid is present in the measurements. It can also be seen as >>>>>>> a form of 'browser id'. >>>>>>> Most websites use either a 'long lived random value in a cookie' or >>>>>>> a 'application session id' for this. >>>>>>> >>>>>>> So with the id of the browser in hand I have the need to group all >>>>>>> events into "periods of activity" which I call a visit. >>>>>>> Such a visit is a bounded subset of all events from a single browser. >>>>>>> >>>>>>> What I need is to add a (sort of) random visit id to the events that >>>>>>> becomes 'inactive' after more than X minutes of inactivity. >>>>>>> I then want to add this visitid to each event and >>>>>>> 1) stream them out in realtime >>>>>>> 2) Wait till the visit ends and store the complete visit on disk (I >>>>>>> am going for either AVRO or Parquet). >>>>>>> >>>>>>> I want to create diskfiles with all visits that ended in a specific >>>>>>> time period. So essentially >>>>>>> "Group by round(<timestamp of last event>, 15 minutes)" >>>>>>> >>>>>>> >>>>>>> Because of the need to be able to 'repair' things I came with the >>>>>>> following question: >>>>>>> In the Flink API I see the 'process time' (i.e. the actual time of >>>>>>> the server) and the 'event time' (i.e. the time when and event was >>>>>>> recorded). >>>>>>> >>>>>>> Now in my case all events are in Kafka (for say 2 weeks). >>>>>>> When something goes wrong I want to be able to 'reprocess' >>>>>>> everything from the start of the queue. >>>>>>> Here the matter of 'event time' becomes a big question for me; In >>>>>>> those 'replay' situations the event time will progress at a much higher >>>>>>> speed than the normal 1sec/sec. >>>>>>> >>>>>>> How does this work in Apache Flink? >>>>>>> >>>>>>> >>>>>>> Niels Basjes >>>>>>> >>>>>>> >>>>>>> On Fri, Nov 27, 2015 at 3:28 PM, Stephan Ewen <se...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hey Niels! >>>>>>>> >>>>>>>> You may be able to implement this in windows anyways, depending on >>>>>>>> your setup. You can definitely implement state with timeout yourself >>>>>>>> (using >>>>>>>> the more low-level state interface), or you may be able to use custom >>>>>>>> windows for that (they can trigger on every element and return elements >>>>>>>> immediately, thereby giving you low latency). >>>>>>>> >>>>>>>> Can you tell me where exactly the session ID comes from? Is that >>>>>>>> something that the function with state generates itself? >>>>>>>> Depending on that answer, I can outline either the window, or the >>>>>>>> custom state way... >>>>>>>> >>>>>>>> Greetings, >>>>>>>> Stephan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Nov 27, 2015 at 2:19 PM, Niels Basjes <ni...@basjes.nl> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Thanks for the explanation. >>>>>>>>> I have clickstream data arriving in realtime and I need to assign >>>>>>>>> the visitId and stream it out again (with the visitId now begin part >>>>>>>>> of the >>>>>>>>> record) into Kafka with the lowest possible latency. >>>>>>>>> Although the Window feature allows me to group and close the visit >>>>>>>>> on a timeout/expire (as shown to me by Aljoscha in a separate email) >>>>>>>>> it >>>>>>>>> does make a 'window'. >>>>>>>>> >>>>>>>>> So (as requested) I created a ticket for such a feature: >>>>>>>>> https://issues.apache.org/jira/browse/FLINK-3089 >>>>>>>>> >>>>>>>>> Niels >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Nov 27, 2015 at 11:51 AM, Stephan Ewen <se...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Niels! >>>>>>>>>> >>>>>>>>>> Currently, state is released by setting the value for the key to >>>>>>>>>> null. If you are tracking web sessions, you can try and send a "end >>>>>>>>>> of >>>>>>>>>> session" element that sets the value to null. >>>>>>>>>> >>>>>>>>>> To be on the safe side, you probably want state that is >>>>>>>>>> automatically purged after a while. I would look into using Windows >>>>>>>>>> for >>>>>>>>>> that. The triggers there are flexible so you can schedule both >>>>>>>>>> actions on >>>>>>>>>> elements plus cleanup after a certain time delay (clock time or >>>>>>>>>> event time). >>>>>>>>>> >>>>>>>>>> The question about "state expiry" has come a few times. People >>>>>>>>>> seem to like working on state directly, but it should clean up >>>>>>>>>> automatically. >>>>>>>>>> >>>>>>>>>> Can you see if your use case fits onto windows, otherwise open a >>>>>>>>>> ticket for state expiry? >>>>>>>>>> >>>>>>>>>> Greetings, >>>>>>>>>> Stephan >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes <ni...@basjes.nl> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I'm working on a streaming application that ingests clickstream >>>>>>>>>>> data. >>>>>>>>>>> In a specific part of the flow I need to retain a little bit of >>>>>>>>>>> state per visitor (i.e. keyBy(sessionid) ) >>>>>>>>>>> >>>>>>>>>>> So I'm using the Key/Value state interface (i.e. OperatorState< >>>>>>>>>>> MyRecord>) in a map function. >>>>>>>>>>> >>>>>>>>>>> Now in my application I expect to get a huge number of sessions >>>>>>>>>>> per day. >>>>>>>>>>> Since these sessionids are 'random' and become unused after the >>>>>>>>>>> visitor leaves the website over time the system will have seen >>>>>>>>>>> millions of >>>>>>>>>>> those sessionids. >>>>>>>>>>> >>>>>>>>>>> So I was wondering: how are these OperatorStates cleaned? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best regards / Met vriendelijke groeten, >>>>>>>>>>> >>>>>>>>>>> Niels Basjes >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best regards / Met vriendelijke groeten, >>>>>>>>> >>>>>>>>> Niels Basjes >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best regards / Met vriendelijke groeten, >>>>>>> >>>>>>> Niels Basjes >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best regards / Met vriendelijke groeten, >>>>> >>>>> Niels Basjes >>>>> >>>> >>>> >>> >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > -- Best regards / Met vriendelijke groeten, Niels Basjes