Hi Raman, Checkpoints are used to recover from task or process failures and usually automatically taken at periodic intervals if configured correctly. Checkpoints are usually removed when a more recent checkpoint is completed (the exact policy can be configured).
Savepoints are used to restart a job that was previously shutdown, to migrate a job to another cluster (e.g., when upgrading Flink), updating the job itself etc. So more for planned maintenance. Nonetheless they can also be used for more coarse-grained fault tolerance and it is a common practice to periodically trigger a savepoint. These blog posts might be helpful to understand the potential of savepoints [1] [2]. Best, Fabian [1] http://data-artisans.com/turning-back-time-savepoints/ [2] http://data-artisans.com/savepoints-part-2-updating-applications/ 2017-01-19 19:02 GMT+01:00 Raman Gupta <rocketra...@gmail.com>: > I was able to get it working well with the original approach you > described. Thanks! Note that the documentation on how to do this with the > Java API is... sparse, to say the least. I was able to look at the > implementation of the scala flatMapWithState function as a starting point. > > Now I'm trying to understand all the operational concerns related to the > stored state. My checkpoints are in rocksdb configured via the job > definition. > > It seems that the checkpointed state of the streaming job is lost when I > stop and restart flink normally, or Flink terminates abnormally and is > restarted. I was able to take an explicit savepoint and then restart the > job with it. > > Is the correct approach as of now to take savepoints periodically via > cron, and use those to re-run jobs in case of flink failure or restart? > > Regards, > Raman > > On 19/01/17 05:43 AM, Fabian Hueske wrote: > > Hi Raman, > > I think you would need a sliding count window of size 2 with slide 1. > This is basically a GlobalWindow with a special trigger. > > However, you would need to modify the custom trigger to be able to > - identify a terminal event (if there is such a thing) or to > - close the window after a certain period of inactivity to clean up the > state. > > Best, Fabian > > 2017-01-19 1:43 GMT+01:00 Raman Gupta <rocketra...@gmail.com>: > >> Thank you for your reply. >> >> If I were to use a keyed stream with a count-based window of 2, would >> Flink keep the last state persistently until the next state is >> received? Would this be another way of having Flink keep this >> information persistently without having to implement it manually? >> >> Thanks, >> Raman >> >> On 18/01/17 11:22 AM, Fabian Hueske wrote: >> > Hi Raman, >> > >> > I would approach this issues as follows. >> > >> > You key the input stream on the sourceId and apply a stateful >> > FlatMapFunction. >> > The FlatMapFunction has a key-partioned state and stores for each key >> > (sourceId) the latest event as state. >> > When a new event arrives, you can compute the time spend in the last >> > state by looking up the event from the state and the latest received >> > event. >> > Then you put the new event in the state. >> > >> > This solution works well if you have a finite number of sources or if >> > you have an terminal event that signals that no more events will >> > arrive for a key. >> > Otherwise, the number of events stored in the state will grow >> > infinitely and eventually become a problem. >> > >> > If the number of sources increases, you need to evict data at some >> > point in time. A ProcessFunction can help here, because you can >> > register a timer which >> > you can use to evict up old state. >> > >> > Hope this helps, >> > Fabian >> > >> > 2017-01-18 15:39 GMT+01:00 Raman Gupta <rocketra...@gmail.com >> > <mailto:rocketra...@gmail.com>>: >> > >> > I am investigating Flink. I am considering a relatively simple use >> > case -- I want to ingest streams of events that are essentially >> > timestamped state changes. These events may look something like: >> > >> > { >> > sourceId: 111, >> > state: OPEN, >> > timestamp: <date/time> >> > } >> > >> > I want to apply various processing to these state change events, the >> > output of which can be used for analytics. For example: >> > >> > 1. average time spent in state, by state >> > 2. sources with longest (or shortest) time spent in OPEN state >> > >> > The time spent in each state may be days or even weeks. >> > >> > All the examples I have seen of similar logic involve windows on the >> > order of 15 minutes. Since time spent in each state may far exceed >> > these window sizes, I'm wondering what the best approach will be. >> > >> > One thought from reading the docs is to use `every` to operate on >> the >> > entire stream. But it seems like this will take longer and longer to >> > run as the event stream grows, so this is not an ideal solution. Or >> > does Flink apply some clever optimizations to avoid the potential >> > performance issue? >> > >> > Another thought was to split the event stream into multiple streams >> by >> > source, each of which will have a small (and limited) amount of >> data. >> > This will make processing each stream simpler, but since there can >> be >> > thousands of sources, it will result in a lot of streams to handle >> and >> > persist (probably in Kafka). This does not seem ideal either. >> > >> > It seems like this should be simple, but I'm struggling with >> > understanding how to solve it elegantly. >> > >> > Regards, >> > Raman >> > >> > >> > > >