Hi! If you want to run with checkpoints (fault tolerance), you need to specify a place to store the checkpoints to.
By default, it is the master's memory (or zookeeper in HA), so we put a limit on the size of the size of the state there. To use larger state, simply configure a different place to store checkpoints to, and you can grow your size as large as your memory permits: env.setStateBackend(new FsStateBackend("hdfs:///data/flink-checkpoints")); or env.setStateBackend(new FsStateBackend("file:///data/flink-checkpoints")); More information on that is in the docs: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/state_backends.html Greetings, Stephan On Tue, Dec 1, 2015 at 5:23 PM, Niels Basjes <ni...@basjes.nl> wrote: > Hi, > > The first thing I noticed is that the Session object maintains a list of > all events in memory. > Your events are really small yet in my scenario the predicted number of > events per session will be above 1000 and each is expected to be in the > 512-1024 bytes range. > This worried me yet I decided to give your code a run. > > After a while running it in my IDE (not on cluster) I got this: > > 17:18:46,336 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 269 @ 1448986726336 > 17:18:46,587 INFO org.apache.flink.runtime.taskmanager.Task > - sessionization -> Sink: Unnamed (4/4) switched to FAILED with > exception. > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier > at > org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:577) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:570) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:201) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:127) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:173) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Size of the state is larger than the > maximum permitted memory-backed state. Size=5246277 , maxSize=5242880 . > Consider using a different state backend, like the File System State > backend. > at > org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:130) > at > org.apache.flink.runtime.state.memory.MemoryStateBackend.checkpointStateSerializable(MemoryStateBackend.java:108) > at > com.dataartisans.streaming.sessionization.SessionizingOperator.snapshotOperatorState(SessionizingOperator.java:162) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:440) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:574) > ... 8 more > > > Niels > > > > On Tue, Dec 1, 2015 at 4:41 PM, Niels Basjes <ni...@basjes.nl> wrote: > >> Thanks! >> I'm going to study this code closely! >> >> Niels >> >> On Tue, Dec 1, 2015 at 2:50 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi Niels! >>> >>> I have a pretty nice example for you here: >>> https://github.com/StephanEwen/sessionization >>> >>> It keeps only one state and has the structure: >>> >>> >>> (source) --> (window sessions) ---> (real time sink) >>> | >>> +--> (15 minute files) >>> >>> >>> The real time sink gets the event with attached visitId immediately. The >>> session operator, as a side effect, writes out the 15 minute files with >>> sessions that expired in that time. >>> >>> >>> It is not a lot of code, the two main parts are >>> >>> - the program and the program skeleton: >>> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/EventTimeSessionization.java >>> - the sessionizing and file writing operator: >>> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/SessionizingOperator.java >>> >>> >>> The example runs fully on event time, where the timestamps are extracted >>> from the records. That makes this program very robust (no issue with >>> clocks, etc). >>> >>> Also, here comes the amazing part: The same program should do "replay" >>> and real time. The only difference is what input you give it. Since time is >>> event time, it can do both. >>> >>> >>> One note: >>> - Event Time Watermarks are the mechanism to signal progress in event >>> time. It is simple here, because I assume that timestamps are ascending in >>> a Kafka partition. If that is not the case, you need to implement a more >>> elaborate TimestampExtractor. >>> >>> >>> Hope you can work with this! >>> >>> Greetings, >>> Stephan >>> >>> >>> On Tue, Dec 1, 2015 at 1:00 PM, Stephan Ewen <se...@apache.org> wrote: >>> >>>> Just for clarification: The real-time results should also contain the >>>> visitId, correct? >>>> >>>> On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen <se...@apache.org> wrote: >>>> >>>>> Hi Niels! >>>>> >>>>> If you want to use the built-in windowing, you probably need two >>>>> window: >>>>> - One for ID assignment (that immediately pipes elements through) >>>>> - One for accumulating session elements, and then piping them into >>>>> files upon session end. >>>>> >>>>> You may be able to use the rolling file sink (roll by 15 minutes) to >>>>> store the files. >>>>> That is probably the simplest to implement and will serve the real >>>>> time case. >>>>> >>>>> >>>>> +--> (real time sink) >>>>> | >>>>> (source) --> (window session ids) --+ >>>>> | >>>>> +--> (window session) --> (rolling >>>>> sink) >>>>> >>>>> >>>>> You can put this all into one operator that accumulates the session >>>>> elements but still immediately emits the new records (the realtime path), >>>>> if you implement your own windowing/buffering in a custom function. >>>>> This is also very easy to put onto event time then, which makes it >>>>> valueable to process the history (replay). For this second case, still >>>>> prototyping some code for the event time case, give me a bit, I'll get >>>>> back >>>>> at you... >>>>> >>>>> Greetings, >>>>> Stephan >>>>> >>>>> >>>>> On Tue, Dec 1, 2015 at 10:55 AM, Niels Basjes <ni...@basjes.nl> wrote: >>>>> >>>>>> Hi Stephan, >>>>>> >>>>>> I created a first version of the Visit ID assignment like this: >>>>>> >>>>>> First I group by sessionid and I create a Window per visit. >>>>>> The custom Trigger for this window does a 'FIRE' after each element >>>>>> and sets an EventTimer on the 'next possible moment the visit can >>>>>> expire'. >>>>>> To avoid getting 'all events' in the visit after every 'FIRE' I'm >>>>>> using CountEvictor.of(1). >>>>>> When the visit expires I do a PURGE. So if there are more events >>>>>> afterwards for the same sessionId I get a new visit (which is exactly >>>>>> what >>>>>> I want). >>>>>> >>>>>> The last step I do is I want to have a 'normal' DataStream again to >>>>>> work with. >>>>>> I created this WindowFunction to map the Window stream back to >>>>>> normal DataStream >>>>>> Essentially I do this: >>>>>> >>>>>> DataStream<Foo> visitDataStream = visitWindowedStream.apply(new >>>>>> WindowToStream<Foo>()) >>>>>> >>>>>> // This is an identity 'apply' >>>>>> private static class WindowToStream<T> implements WindowFunction<T, >>>>>> T, String, GlobalWindow> { >>>>>> @Override >>>>>> public void apply(String s, GlobalWindow window, Iterable<T> >>>>>> values, Collector<T> out) throws Exception { >>>>>> for (T value: values) { >>>>>> out.collect(value); >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> The problem with this is that I first create the visitIds in a Window >>>>>> (great). >>>>>> Because I really need to have both the Windowed events AND the near >>>>>> realtime version I currently break down the Window to get the single >>>>>> events >>>>>> and after that I have to recreate the same Window again. >>>>>> >>>>>> I'm looking forward to the implementation direction you are referring >>>>>> to. I hope you have a better way of doing this. >>>>>> >>>>>> Niels Basjes >>>>>> >>>>>> >>>>>> On Mon, Nov 30, 2015 at 9:29 PM, Stephan Ewen <se...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Niels! >>>>>>> >>>>>>> Nice use case that you have! >>>>>>> I think you can solve this super nicely with Flink, such that >>>>>>> "replay" and "realtime" are literally the same program - they differ >>>>>>> only >>>>>>> in whether >>>>>>> >>>>>>> Event time is, like you said, the key thing for "replay". Event time >>>>>>> depends on the progress in the timestamps of the data, so it can >>>>>>> progress >>>>>>> at different speeds, depending on what the rate of your stream is. >>>>>>> With the appropriate data source, it will progress very fast in >>>>>>> "replay mode", so that you replay in "fast forward speed", and it >>>>>>> progresses at the same speed as processing time when you attach to the >>>>>>> end >>>>>>> of the Kafka queue. >>>>>>> >>>>>>> When you define the time intervals in your program to react to event >>>>>>> time progress, then you will compute the right sessionization in both >>>>>>> replay and real time settings. >>>>>>> >>>>>>> I am writing a little example code to share. The type of >>>>>>> ID-assignment sessions you want to do need an undocumented API right >>>>>>> now, >>>>>>> so I'll prepare something there for you... >>>>>>> >>>>>>> Greetings, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes <ni...@basjes.nl> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> The sessionid is present in the measurements. It can also be seen >>>>>>>> as a form of 'browser id'. >>>>>>>> Most websites use either a 'long lived random value in a cookie' or >>>>>>>> a 'application session id' for this. >>>>>>>> >>>>>>>> So with the id of the browser in hand I have the need to group all >>>>>>>> events into "periods of activity" which I call a visit. >>>>>>>> Such a visit is a bounded subset of all events from a single >>>>>>>> browser. >>>>>>>> >>>>>>>> What I need is to add a (sort of) random visit id to the events >>>>>>>> that becomes 'inactive' after more than X minutes of inactivity. >>>>>>>> I then want to add this visitid to each event and >>>>>>>> 1) stream them out in realtime >>>>>>>> 2) Wait till the visit ends and store the complete visit on disk (I >>>>>>>> am going for either AVRO or Parquet). >>>>>>>> >>>>>>>> I want to create diskfiles with all visits that ended in a specific >>>>>>>> time period. So essentially >>>>>>>> "Group by round(<timestamp of last event>, 15 minutes)" >>>>>>>> >>>>>>>> >>>>>>>> Because of the need to be able to 'repair' things I came with the >>>>>>>> following question: >>>>>>>> In the Flink API I see the 'process time' (i.e. the actual time of >>>>>>>> the server) and the 'event time' (i.e. the time when and event was >>>>>>>> recorded). >>>>>>>> >>>>>>>> Now in my case all events are in Kafka (for say 2 weeks). >>>>>>>> When something goes wrong I want to be able to 'reprocess' >>>>>>>> everything from the start of the queue. >>>>>>>> Here the matter of 'event time' becomes a big question for me; In >>>>>>>> those 'replay' situations the event time will progress at a much higher >>>>>>>> speed than the normal 1sec/sec. >>>>>>>> >>>>>>>> How does this work in Apache Flink? >>>>>>>> >>>>>>>> >>>>>>>> Niels Basjes >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Nov 27, 2015 at 3:28 PM, Stephan Ewen <se...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hey Niels! >>>>>>>>> >>>>>>>>> You may be able to implement this in windows anyways, depending on >>>>>>>>> your setup. You can definitely implement state with timeout yourself >>>>>>>>> (using >>>>>>>>> the more low-level state interface), or you may be able to use custom >>>>>>>>> windows for that (they can trigger on every element and return >>>>>>>>> elements >>>>>>>>> immediately, thereby giving you low latency). >>>>>>>>> >>>>>>>>> Can you tell me where exactly the session ID comes from? Is that >>>>>>>>> something that the function with state generates itself? >>>>>>>>> Depending on that answer, I can outline either the window, or the >>>>>>>>> custom state way... >>>>>>>>> >>>>>>>>> Greetings, >>>>>>>>> Stephan >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Nov 27, 2015 at 2:19 PM, Niels Basjes <ni...@basjes.nl> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Thanks for the explanation. >>>>>>>>>> I have clickstream data arriving in realtime and I need to assign >>>>>>>>>> the visitId and stream it out again (with the visitId now begin part >>>>>>>>>> of the >>>>>>>>>> record) into Kafka with the lowest possible latency. >>>>>>>>>> Although the Window feature allows me to group and close the >>>>>>>>>> visit on a timeout/expire (as shown to me by Aljoscha in a separate >>>>>>>>>> email) >>>>>>>>>> it does make a 'window'. >>>>>>>>>> >>>>>>>>>> So (as requested) I created a ticket for such a feature: >>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-3089 >>>>>>>>>> >>>>>>>>>> Niels >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Nov 27, 2015 at 11:51 AM, Stephan Ewen <se...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Niels! >>>>>>>>>>> >>>>>>>>>>> Currently, state is released by setting the value for the key to >>>>>>>>>>> null. If you are tracking web sessions, you can try and send a "end >>>>>>>>>>> of >>>>>>>>>>> session" element that sets the value to null. >>>>>>>>>>> >>>>>>>>>>> To be on the safe side, you probably want state that is >>>>>>>>>>> automatically purged after a while. I would look into using Windows >>>>>>>>>>> for >>>>>>>>>>> that. The triggers there are flexible so you can schedule both >>>>>>>>>>> actions on >>>>>>>>>>> elements plus cleanup after a certain time delay (clock time or >>>>>>>>>>> event time). >>>>>>>>>>> >>>>>>>>>>> The question about "state expiry" has come a few times. People >>>>>>>>>>> seem to like working on state directly, but it should clean up >>>>>>>>>>> automatically. >>>>>>>>>>> >>>>>>>>>>> Can you see if your use case fits onto windows, otherwise open a >>>>>>>>>>> ticket for state expiry? >>>>>>>>>>> >>>>>>>>>>> Greetings, >>>>>>>>>>> Stephan >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes <ni...@basjes.nl> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I'm working on a streaming application that ingests clickstream >>>>>>>>>>>> data. >>>>>>>>>>>> In a specific part of the flow I need to retain a little bit of >>>>>>>>>>>> state per visitor (i.e. keyBy(sessionid) ) >>>>>>>>>>>> >>>>>>>>>>>> So I'm using the Key/Value state interface (i.e. OperatorState< >>>>>>>>>>>> MyRecord>) in a map function. >>>>>>>>>>>> >>>>>>>>>>>> Now in my application I expect to get a huge number of sessions >>>>>>>>>>>> per day. >>>>>>>>>>>> Since these sessionids are 'random' and become unused after the >>>>>>>>>>>> visitor leaves the website over time the system will have seen >>>>>>>>>>>> millions of >>>>>>>>>>>> those sessionids. >>>>>>>>>>>> >>>>>>>>>>>> So I was wondering: how are these OperatorStates cleaned? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Best regards / Met vriendelijke groeten, >>>>>>>>>>>> >>>>>>>>>>>> Niels Basjes >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best regards / Met vriendelijke groeten, >>>>>>>>>> >>>>>>>>>> Niels Basjes >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards / Met vriendelijke groeten, >>>>>>>> >>>>>>>> Niels Basjes >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best regards / Met vriendelijke groeten, >>>>>> >>>>>> Niels Basjes >>>>>> >>>>> >>>>> >>>> >>> >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >