Hi!

If you want to run with checkpoints (fault tolerance), you need to specify
a place to store the checkpoints to.

By default, it is the master's memory (or zookeeper in HA), so we put a
limit on the size of the size of the state there.

To use larger state, simply configure a different place to store
checkpoints to, and you can grow your size as large as your memory permits:

env.setStateBackend(new FsStateBackend("hdfs:///data/flink-checkpoints"));

or

env.setStateBackend(new FsStateBackend("file:///data/flink-checkpoints"));


More information on that is in the docs:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/state_backends.html

Greetings,
Stephan



On Tue, Dec 1, 2015 at 5:23 PM, Niels Basjes <ni...@basjes.nl> wrote:

> Hi,
>
> The first thing I noticed is that the Session object maintains a list of
> all events in memory.
> Your events are really small yet in my scenario the predicted number of
> events per session will be above 1000 and each is expected to be in the
> 512-1024 bytes range.
> This worried me yet I decided to give your code a run.
>
> After a while running it in my IDE (not on cluster) I got this:
>
> 17:18:46,336 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 269 @ 1448986726336
> 17:18:46,587 INFO  org.apache.flink.runtime.taskmanager.Task
>       - sessionization -> Sink: Unnamed (4/4) switched to FAILED with
> exception.
> java.lang.RuntimeException: Error triggering a checkpoint as the result of
> receiving checkpoint barrier
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:577)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:570)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:201)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:127)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:173)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Size of the state is larger than the
> maximum permitted memory-backed state. Size=5246277 , maxSize=5242880 .
> Consider using a different state backend, like the File System State
> backend.
> at
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:130)
> at
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkpointStateSerializable(MemoryStateBackend.java:108)
> at
> com.dataartisans.streaming.sessionization.SessionizingOperator.snapshotOperatorState(SessionizingOperator.java:162)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:440)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:574)
> ... 8 more
>
>
> Niels
>
>
>
> On Tue, Dec 1, 2015 at 4:41 PM, Niels Basjes <ni...@basjes.nl> wrote:
>
>> Thanks!
>> I'm going to study this code closely!
>>
>> Niels
>>
>> On Tue, Dec 1, 2015 at 2:50 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi Niels!
>>>
>>> I have a pretty nice example for you here:
>>> https://github.com/StephanEwen/sessionization
>>>
>>> It keeps only one state and has the structure:
>>>
>>>
>>> (source) --> (window sessions) ---> (real time sink)
>>>                       |
>>>                       +--> (15 minute files)
>>>
>>>
>>> The real time sink gets the event with attached visitId immediately. The
>>> session operator, as a side effect, writes out the 15 minute files with
>>> sessions that expired in that time.
>>>
>>>
>>> It is not a lot of code, the two main parts are
>>>
>>>   - the program and the program skeleton:
>>> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/EventTimeSessionization.java
>>>   - the sessionizing and file writing operator:
>>> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/SessionizingOperator.java
>>>
>>>
>>> The example runs fully on event time, where the timestamps are extracted
>>> from the records. That makes this program very robust (no issue with
>>> clocks, etc).
>>>
>>> Also, here comes the amazing part: The same program should do "replay"
>>> and real time. The only difference is what input you give it. Since time is
>>> event time, it can do both.
>>>
>>>
>>> One note:
>>>   - Event Time Watermarks are the mechanism to signal progress in event
>>> time. It is simple here, because I assume that timestamps are ascending in
>>> a Kafka partition. If that is not the case, you need to implement a more
>>> elaborate TimestampExtractor.
>>>
>>>
>>> Hope you can work with this!
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Tue, Dec 1, 2015 at 1:00 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Just for clarification: The real-time results should also contain the
>>>> visitId, correct?
>>>>
>>>> On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> Hi Niels!
>>>>>
>>>>> If you want to use the built-in windowing, you probably need two
>>>>> window:
>>>>>   - One for ID assignment (that immediately pipes elements through)
>>>>>   - One for accumulating session elements, and then piping them into
>>>>> files upon session end.
>>>>>
>>>>> You may be able to use the rolling file sink (roll by 15 minutes) to
>>>>> store the files.
>>>>> That is probably the simplest to implement and will serve the real
>>>>> time case.
>>>>>
>>>>>
>>>>>                                     +--> (real time sink)
>>>>>                                     |
>>>>> (source) --> (window session ids) --+
>>>>>                                     |
>>>>>                                     +--> (window session) --> (rolling
>>>>> sink)
>>>>>
>>>>>
>>>>> You can put this all into one operator that accumulates the session
>>>>> elements but still immediately emits the new records (the realtime path),
>>>>> if you implement your own windowing/buffering in a custom function.
>>>>> This is also very easy to put onto event time then, which makes it
>>>>> valueable to process the history (replay). For this second case, still
>>>>> prototyping some code for the event time case, give me a bit, I'll get 
>>>>> back
>>>>> at you...
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Tue, Dec 1, 2015 at 10:55 AM, Niels Basjes <ni...@basjes.nl> wrote:
>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>> I created a first version of the Visit ID assignment like this:
>>>>>>
>>>>>> First I group by sessionid and I create a Window per visit.
>>>>>> The custom Trigger for this window does a 'FIRE' after each element
>>>>>> and sets an EventTimer on the 'next possible moment the visit can 
>>>>>> expire'.
>>>>>> To avoid getting 'all events' in the visit after every 'FIRE' I'm
>>>>>> using CountEvictor.of(1).
>>>>>> When the visit expires I do a PURGE. So if there are more events
>>>>>> afterwards for the same sessionId I get a new visit (which is exactly 
>>>>>> what
>>>>>> I want).
>>>>>>
>>>>>> The last step I do is I want to have a 'normal' DataStream again to
>>>>>> work with.
>>>>>> I created this WindowFunction to map the Window stream back to
>>>>>>  normal DataStream
>>>>>> Essentially I do this:
>>>>>>
>>>>>> DataStream<Foo> visitDataStream = visitWindowedStream.apply(new
>>>>>> WindowToStream<Foo>())
>>>>>>
>>>>>> // This is an identity 'apply'
>>>>>> private static class WindowToStream<T> implements WindowFunction<T,
>>>>>> T, String, GlobalWindow> {
>>>>>>     @Override
>>>>>>     public void apply(String s, GlobalWindow window, Iterable<T>
>>>>>> values, Collector<T> out) throws Exception {
>>>>>>         for (T value: values) {
>>>>>>             out.collect(value);
>>>>>>         }
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> The problem with this is that I first create the visitIds in a Window
>>>>>> (great).
>>>>>> Because I really need to have both the Windowed events AND the near
>>>>>> realtime version I currently break down the Window to get the single 
>>>>>> events
>>>>>> and after that I have to recreate the same Window again.
>>>>>>
>>>>>> I'm looking forward to the implementation direction you are referring
>>>>>> to. I hope you have a better way of doing this.
>>>>>>
>>>>>> Niels Basjes
>>>>>>
>>>>>>
>>>>>> On Mon, Nov 30, 2015 at 9:29 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Niels!
>>>>>>>
>>>>>>> Nice use case that you have!
>>>>>>> I think you can solve this super nicely with Flink, such that
>>>>>>> "replay" and "realtime" are literally the same program - they differ 
>>>>>>> only
>>>>>>> in whether
>>>>>>>
>>>>>>> Event time is, like you said, the key thing for "replay". Event time
>>>>>>> depends on the progress in the timestamps of the data, so it can 
>>>>>>> progress
>>>>>>> at different speeds, depending on what the rate of your stream is.
>>>>>>> With the appropriate data source, it will progress very fast in
>>>>>>> "replay mode", so that you replay in "fast forward speed", and it
>>>>>>> progresses at the same speed as processing time when you attach to the 
>>>>>>> end
>>>>>>> of the Kafka queue.
>>>>>>>
>>>>>>> When you define the time intervals in your program to react to event
>>>>>>> time progress, then you will compute the right sessionization in both
>>>>>>> replay and real time settings.
>>>>>>>
>>>>>>> I am writing a little example code to share. The type of
>>>>>>> ID-assignment sessions you want to do need an undocumented API right 
>>>>>>> now,
>>>>>>> so I'll prepare something there for you...
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes <ni...@basjes.nl>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> The sessionid is present in the measurements. It can also be seen
>>>>>>>> as a form of 'browser id'.
>>>>>>>> Most websites use either a 'long lived random value in a cookie' or
>>>>>>>> a 'application session id' for this.
>>>>>>>>
>>>>>>>> So with the id of the browser in hand I have the need to group all
>>>>>>>> events into "periods of activity" which I call a visit.
>>>>>>>> Such a visit is a bounded subset of all events from a single
>>>>>>>> browser.
>>>>>>>>
>>>>>>>> What I need is to add a (sort of) random visit id to the events
>>>>>>>> that becomes 'inactive' after more than X minutes of inactivity.
>>>>>>>> I then want to add this visitid to each event and
>>>>>>>> 1) stream them out in realtime
>>>>>>>> 2) Wait till the visit ends and store the complete visit on disk (I
>>>>>>>> am going for either AVRO or Parquet).
>>>>>>>>
>>>>>>>> I want to create diskfiles with all visits that ended in a specific
>>>>>>>> time period. So essentially
>>>>>>>>         "Group by round(<timestamp of last event>, 15 minutes)"
>>>>>>>>
>>>>>>>>
>>>>>>>> Because of the need to be able to 'repair' things I came with the
>>>>>>>> following question:
>>>>>>>> In the Flink API I see the 'process time' (i.e. the actual time of
>>>>>>>> the server) and the 'event time' (i.e. the time when and event was
>>>>>>>> recorded).
>>>>>>>>
>>>>>>>> Now in my case all events are in Kafka (for say 2 weeks).
>>>>>>>> When something goes wrong I want to be able to 'reprocess'
>>>>>>>> everything from the start of the queue.
>>>>>>>> Here the matter of 'event time' becomes a big question for me; In
>>>>>>>> those 'replay' situations the event time will progress at a much higher
>>>>>>>> speed than the normal 1sec/sec.
>>>>>>>>
>>>>>>>> How does this work in Apache Flink?
>>>>>>>>
>>>>>>>>
>>>>>>>> Niels Basjes
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Nov 27, 2015 at 3:28 PM, Stephan Ewen <se...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey Niels!
>>>>>>>>>
>>>>>>>>> You may be able to implement this in windows anyways, depending on
>>>>>>>>> your setup. You can definitely implement state with timeout yourself 
>>>>>>>>> (using
>>>>>>>>> the more low-level state interface), or you may be able to use custom
>>>>>>>>> windows for that (they can trigger on every element and return 
>>>>>>>>> elements
>>>>>>>>> immediately, thereby giving you low latency).
>>>>>>>>>
>>>>>>>>> Can you tell me where exactly the session ID comes from? Is that
>>>>>>>>> something that the function with state generates itself?
>>>>>>>>> Depending on that answer, I can outline either the window, or the
>>>>>>>>> custom state way...
>>>>>>>>>
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Nov 27, 2015 at 2:19 PM, Niels Basjes <ni...@basjes.nl>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Thanks for the explanation.
>>>>>>>>>> I have clickstream data arriving in realtime and I need to assign
>>>>>>>>>> the visitId and stream it out again (with the visitId now begin part 
>>>>>>>>>> of the
>>>>>>>>>> record) into Kafka with the lowest possible latency.
>>>>>>>>>> Although the Window feature allows me to group and close the
>>>>>>>>>> visit on a timeout/expire (as shown to me by Aljoscha in a separate 
>>>>>>>>>> email)
>>>>>>>>>> it does make a 'window'.
>>>>>>>>>>
>>>>>>>>>> So (as requested) I created a ticket for such a feature:
>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-3089
>>>>>>>>>>
>>>>>>>>>> Niels
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Nov 27, 2015 at 11:51 AM, Stephan Ewen <se...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Niels!
>>>>>>>>>>>
>>>>>>>>>>> Currently, state is released by setting the value for the key to
>>>>>>>>>>> null. If you are tracking web sessions, you can try and send a "end 
>>>>>>>>>>> of
>>>>>>>>>>> session" element that sets the value to null.
>>>>>>>>>>>
>>>>>>>>>>> To be on the safe side, you probably want state that is
>>>>>>>>>>> automatically purged after a while. I would look into using Windows 
>>>>>>>>>>> for
>>>>>>>>>>> that. The triggers there are flexible so you can schedule both 
>>>>>>>>>>> actions on
>>>>>>>>>>> elements plus cleanup after a certain time delay (clock time or 
>>>>>>>>>>> event time).
>>>>>>>>>>>
>>>>>>>>>>> The question about "state expiry" has come a few times. People
>>>>>>>>>>> seem to like working on state directly, but it should clean up
>>>>>>>>>>> automatically.
>>>>>>>>>>>
>>>>>>>>>>> Can you see if your use case fits onto windows, otherwise open a
>>>>>>>>>>> ticket for state expiry?
>>>>>>>>>>>
>>>>>>>>>>> Greetings,
>>>>>>>>>>> Stephan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes <ni...@basjes.nl>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm working on a streaming application that ingests clickstream
>>>>>>>>>>>> data.
>>>>>>>>>>>> In a specific part of the flow I need to retain a little bit of
>>>>>>>>>>>> state per visitor (i.e. keyBy(sessionid) )
>>>>>>>>>>>>
>>>>>>>>>>>> So I'm using the Key/Value state interface (i.e. OperatorState<
>>>>>>>>>>>> MyRecord>) in a map function.
>>>>>>>>>>>>
>>>>>>>>>>>> Now in my application I expect to get a huge number of sessions
>>>>>>>>>>>> per day.
>>>>>>>>>>>> Since these sessionids are 'random' and become unused after the
>>>>>>>>>>>> visitor leaves the website over time the system will have seen 
>>>>>>>>>>>> millions of
>>>>>>>>>>>> those sessionids.
>>>>>>>>>>>>
>>>>>>>>>>>> So I was wondering: how are these OperatorStates cleaned?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best regards / Met vriendelijke groeten,
>>>>>>>>>>>>
>>>>>>>>>>>> Niels Basjes
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards / Met vriendelijke groeten,
>>>>>>>>>>
>>>>>>>>>> Niels Basjes
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards / Met vriendelijke groeten,
>>>>>>>>
>>>>>>>> Niels Basjes
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards / Met vriendelijke groeten,
>>>>>>
>>>>>> Niels Basjes
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Reply via email to