Till, Fabian, Looping back after a gap on this, for some reason this looks like a need very specific to us (I would have thought this would be of interest to others as well). We on-boarded one of our new IoT data sources and our total checkpoints right now are over 1TB and checkpoint period is 5 seconds - those are with delta states enabled (I explained how transient the states are previously). I sincerely don’t see any need of this, especially given tolerances we have for little loss/dups and also the fact that we are going to on-board a few data sources at this scale. We switched over to local SSDs on our cluster just to isolate this use case from destroying our HDFS :)
Of source it is easier said than done, but event based Checkpoint (eg: we are able to checkpoint when RESTART happens etc) as discussed below would be great. Thanks, Ashish > On Jun 15, 2018, at 10:28 PM, Ashish Pokharel <ashish...@yahoo.com> wrote: > > Hi Till, Fabian, > > Thanks for your responses again. > > Till, you have nailed it. I will comment on them individually. But first, I > feel like I am still not stating it well enough to illustrate the need. May > be I am overthinking :) > > So let me try one more time with a preface that we are talking about millions > of sensors reporting logs/metrics. So in my cluster we can potentially have > 10s if not 100s of such apps for variety of data. I currently have 1 app in > Prod so I can do a lot testing :) Just as a test, I enabled RocksDB State > Backend and Checkpointing every 5 seconds with Graphite metrics enabled. On > an average I could see almost 25GB of total state being written across couple > of hundred slots based on Graphite numbers - it is setup with incremental and > async Checkpoints. I am assuming main reason being states are transient and > deltas are essentially entire set of new states. Our main concern is > real-time processing vs no data loss or even possibly a few duplicates. To > Fabian’s point, at least once vs exactly once semantics are also not of > utmost concern at this point. Now, bottom line is I have Checkpointing > disabled and use MemoryStateBackend with the thought that writing massive > states to persistence every few seconds didn’t seem like best use of > resources - I’d rather fit in more of these apps in cluster and use stateful > processing for apps we really need them on. However, this leads to 2 main > issue > > 1- If an operator fails (let’s say Kafka timeouts), entire job graph restarts > which leads us to more than desirable gap of data (lost states across 100s of > operators) as obviously there is no recoverable state > 2- Same issue happens in planned restarts > > Per Fabian’s suggestion, I am going to try RocksDB State Backend with local > drives and run some baseline tests - hoping states are kept in memory for the > most part unless spillover is needed. This should at least allow us with > decent solution of (2). I am still not convinced we should enable periodic > Checkpointing (perhaps I am wrong here but again I have highlighted my > reasons above). > > " >> Just for my own understanding: What do you want to do with event-based >> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a >> partial snapshot which should then be used for recovery? Technically, this >> is possible but I'm not sure how much value this would add for Flink users. >> A partial snapshot could also be completely empty (equivalent of disabling >> checkpointing). > >> I can see the point of making the checkpoint triggering more flexible and >> giving some control to the user. In contrast to savepoints, checkpoints are >> considered for recovery. My question here would be, what would be the >> triggering condition in your case (other than time)? > " > I’d think trigger condition would be based on life-cycle hook like RESTART > (or perhaps even an external message when FLINK-6131 is available may be). > Partial (best possible) snapshot is exactly what it would be - states from > failing operators cannot be expected to be recoverable obviously. > >> What the community will add very soon is an atomic stop with savepoint call >> which will take a savepoint of your job's state when and shut it down. > > Very nice! Would this also have same need to use Fs or RocksDB State Backend? > It shouldn’t be an issue for us as long as my tests above turn out to be > decent. > > Thanks again guys for your advice and feedback. Really appreciate it. > > — Ashish > > >> On Jun 15, 2018, at 5:43 AM, Till Rohrmann <trohrm...@apache.org >> <mailto:trohrm...@apache.org>> wrote: >> >> Hi, >> >> ideally we would not have to cancel all tasks and only redeploy the whole >> job in case of a restart. Instead we should do what you've outlined: >> Redeploy the failed tasks and reset the state of all other running tasks. At >> the moment, this is, however, not yet possible. While improving Flink's >> recovery behavior this should be addressed eventually. >> >> Just for my own understanding: What do you want to do with event-based >> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a >> partial snapshot which should then be used for recovery? Technically, this >> is possible but I'm not sure how much value this would add for Flink users. >> A partial snapshot could also be completely empty (equivalent of disabling >> checkpointing). >> >> I can see the point of making the checkpoint triggering more flexible and >> giving some control to the user. In contrast to savepoints, checkpoints are >> considered for recovery. My question here would be, what would be the >> triggering condition in your case (other than time)? >> >> What the community will add very soon is an atomic stop with savepoint call >> which will take a savepoint of your job's state when and shut it down. >> >> Cheers, >> Till >> >> On Thu, Jun 14, 2018 at 4:55 PM Fabian Hueske <fhue...@gmail.com >> <mailto:fhue...@gmail.com>> wrote: >> Hi Ashish, >> >> (I think) I understand your requirements and the approach of just keep >> non-failing tasks running is intuitively a good idea. >> However, this is only an option for use cases that are OK with at-least-once >> semantics (otherwise, we'd need to reset the state of the still running >> tasks and hence take checkpoints). >> Moreover, the distributed task coordination for keeping some tasks running, >> restarting others, and connecting them is obviously more difficult than >> "just" canceling the whole job and starting it again. >> >> I have to admit that I'm not that familiar with Flink's distributed task >> coordination. Till in CC knows much more about that. >> However, I think the question here is, how many use cases would benefit from >> a recovery mode with at-most-once state guarantees and how much >> implementation effort would it be to support it. >> >> Regarding the savepoints, if you are using the MemoryStateBackend failure at >> too large state size is expected since all state is replicated into the >> JobManager JVM. >> Did you try to use the FsStateBackend? It also holds the state on the >> TaskManager heap but backups it to a (distributed) filesystem. >> >> Best, Fabian >> >> 2018-06-14 4:18 GMT+02:00 Ashish Pokharel <ashish...@yahoo.com >> <mailto:ashish...@yahoo.com>>: >> Hi Fabian, >> >> Thanks for the prompt response and apologies for delayed response. >> >> You wrapped up the bottom lines pretty well - if I were to wrap it up I’d >> say “best possible” recovery on “known" restarts either say manual cancel + >> start OR framework initiated ones like on operator failures with these >> constraints >> - some data loss is ok >> - avoid periodic checkpoints as states are really transient (less than 5 >> seconds of lifetime if not milliseconds) and almost all events make it to >> state. I do understand that checkpointing performance has drastically been >> improved and with async and RocksDB options, it should technically not add >> latency in application etc. However, I feel like even with improvements and >> local checkpointing (which we already are doing) it is a lot of “unused” >> IOPS/resource utilization especially if we start to spin up more apps >> handling similar data sources and with similar requirements. On a first >> blush it feels like those resources are better utilized in cluster for apps >> with stricter SLAs for data loss and recovery etc instead. >> >> Basically, I suppose I am thinking Checkpointing feature that is initialized >> by certain actions / events rather than periodic ones. Let me know I am >> off-base here and I should just enable checkpointing in all of these apps >> and move on :) >> >> I tried Savepoint again and it looks like the issue is caused by the fact >> that Memory states are large as it is throwing error states are larger than >> certain size. So solution of (1) will possibly solve (2) as well. >> >> Thanks again, >> >> Ashish >> >> >>> On Jun 7, 2018, at 4:25 PM, Fabian Hueske <fhue...@gmail.com >>> <mailto:fhue...@gmail.com>> wrote: >>> >>> Hi Ashish, >>> >>> Thanks for the great write up. >>> If I understood you correctly, there are two different issues that are >>> caused by the disabled checkpointing. >>> >>> 1) Recovery from a failure without restarting all operators to preserve the >>> state in the running tasks >>> 2) Planned restarts an application without losing all state (even with >>> disabled checkpointing). >>> >>> Ad 1) The community is constantly working on reducing the time for >>> checkpointing and recovery. >>> For 1.5, local task recovery was added, which basically stores a state copy >>> on the local disk which is read in case of a recovery. So, tasks are >>> restarted but don't read the to restore state from distributed storage but >>> from the local disk. >>> AFAIK, this can only be used together with remote checkpoints. I think this >>> might be an interesting option for you if it would be possible to write >>> checkpoints only to local disk and not remote storage. AFAIK, there are >>> also other efforts to reduce the number of restarted tasks in case of a >>> failure. I guess, you've played with other features such as >>> RocksDBStateBackend, incremental and async checkpoints already. >>> >>> Ad 2) It sounds as if savepoints are exactly the feature your are looking >>> for. It would be good to know what exactly did not work for you. The >>> MemoryStateBackend is not suitable for large state sizes because it backups >>> into the heap memory of the JobManager. >>> >>> Best, Fabian >>> >>> 2018-06-05 21:57 GMT+02:00 ashish pok <ashish...@yahoo.com >>> <mailto:ashish...@yahoo.com>>: >>> Fabian, Stephan, All, >>> >>> I started a discussion a while back around having a form of event-based >>> checkpointing policy that will help us in some of our high volume data >>> pipelines. Here is an effort to put this in front of community and >>> understand what capabilities can support these type of use cases, how much >>> others feel the same need and potentially a feature that can make it to a >>> user story. >>> >>> Use Case Summary: >>> - Extremely high volume of data (events from consumer devices with customer >>> base of over 100M) >>> - Multiple events need to be combined using a windowing streaming app >>> grouped by keys (something like 5 min floor of timestamp and unique >>> identifiers for customer devices) >>> - "Most" events by a group/key arrive in few seconds if not milliseconds >>> however events can sometimes delay or get lost in transport (so delayed >>> event handling and timeouts will be needed) >>> - Extremely low (pretty vague but hopefully details below clarify it more) >>> data loss is acceptable >>> - Because of the volume and transient nature of source, checkpointing is >>> turned off (saves on writes to persistence as states/sessions are active >>> for only few seconds during processing) >>> >>> Problem Summary: >>> Of course, none of the above is out of the norm for Flink and as a matter >>> of factor we already have a Flink app doing this. The issue arises when it >>> comes to graceful shutdowns and on operator failures (eg: Kafka timeouts >>> etc.) On operator failures, entire job graph restarts which essentially >>> flushes out in-memory states/sessions. I think there is a feature in works >>> (not sure if it made it to 1.5) to perform selective restarts which will >>> control the damage but still will result in data loss. Also, it doesn't >>> help when application restarts are needed. We did try going savepoint route >>> for explicit restart needs but I think MemoryBackedState ran into issues >>> for larger states or something along those line(not certain). We obviously >>> cannot recover an operator that actually fails because it's own state could >>> be unrecoverable. However, it feels like Flink already has a lot of >>> plumbing to help with overall problem of allowing some sort of recoverable >>> state to handle graceful shutdowns and restarts with minimal data loss. >>> >>> Solutions: >>> Some in community commented on my last email with decent ideas like having >>> an event-based checkpointing trigger (on shutdown, on restart etc) or >>> life-cycle hooks (onCancel, onRestart etc) in Functions that can be >>> implemented if this type of behavior is needed etc. >>> >>> Appreciate feedback from community on how useful this might be for others >>> and from core contributors on their thoughts as well. >>> >>> Thanks in advance, Ashish >>> >>> >> >> >