Till, Fabian,

Looping back after a gap on this, for some reason this looks like a need very 
specific to us (I would have thought this would be of interest to others as 
well). We on-boarded one of our new IoT data sources and our total checkpoints 
right now are over 1TB and checkpoint period is 5 seconds - those are with 
delta states enabled (I explained how transient the states are previously). I 
sincerely don’t see any need of this, especially given tolerances we have for 
little loss/dups and also the fact that we are going to on-board a few data 
sources at this scale. We switched over to local SSDs on our cluster just to 
isolate this use case from destroying our HDFS :)

Of source it is easier said than done, but event based Checkpoint (eg: we are 
able to checkpoint when RESTART happens etc) as discussed below would be great. 

Thanks, Ashish


> On Jun 15, 2018, at 10:28 PM, Ashish Pokharel <ashish...@yahoo.com> wrote:
> 
> Hi Till, Fabian,
> 
> Thanks for your responses again. 
> 
> Till, you have nailed it. I will comment on them individually. But first, I 
> feel like I am still not stating it well enough to illustrate the need. May 
> be I am overthinking :)
> 
> So let me try one more time with a preface that we are talking about millions 
> of sensors reporting logs/metrics. So in my cluster we can potentially have 
> 10s if not 100s of such apps for variety of data. I currently have 1 app in 
> Prod so I can do a lot testing :) Just as a test, I enabled RocksDB State 
> Backend and Checkpointing every 5 seconds with Graphite metrics enabled. On 
> an average I could see almost 25GB of total state being written across couple 
> of hundred slots based on Graphite numbers - it is setup with incremental and 
> async Checkpoints. I am assuming main reason being states are transient and 
> deltas are essentially entire set of new states. Our main concern is 
> real-time processing vs no data loss or even possibly a few duplicates. To 
> Fabian’s point, at least once vs exactly once semantics are also not of 
> utmost concern at this point. Now, bottom line is I have Checkpointing 
> disabled and use MemoryStateBackend with the thought that writing massive 
> states to persistence every few seconds didn’t seem like best use of 
> resources - I’d rather fit in more of these apps in cluster and use stateful 
> processing for apps we really need them on. However, this leads to 2 main 
> issue
> 
> 1- If an operator fails (let’s say Kafka timeouts), entire job graph restarts 
> which leads us to more than desirable gap of data (lost states across 100s of 
> operators) as obviously there is no recoverable state
> 2- Same issue happens in planned restarts
> 
> Per Fabian’s suggestion, I am going to try RocksDB State Backend with local 
> drives and run some baseline tests - hoping states are kept in memory for the 
> most part unless spillover is needed. This should at least allow us with 
> decent solution of (2). I am still not convinced we should enable periodic 
> Checkpointing (perhaps I am wrong here but again I have highlighted my 
> reasons above).
> 
> "
>> Just for my own understanding: What do you want to do with event-based 
>> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a 
>> partial snapshot which should then be used for recovery? Technically, this 
>> is possible but I'm not sure how much value this would add for Flink users. 
>> A partial snapshot could also be completely empty (equivalent of disabling 
>> checkpointing).
> 
>> I can see the point of making the checkpoint triggering more flexible and 
>> giving some control to the user. In contrast to savepoints, checkpoints are 
>> considered for recovery. My question here would be, what would be the 
>> triggering condition in your case (other than time)?
> "
> I’d think trigger condition would be based on life-cycle hook like RESTART 
> (or perhaps even an external message when FLINK-6131 is available may be). 
> Partial (best possible) snapshot is exactly what it would be - states from 
> failing operators cannot be expected to be recoverable obviously.
> 
>> What the community will add very soon is an atomic stop with savepoint call 
>> which will take a savepoint of your job's state when and shut it down.
> 
> Very nice! Would this also have same need to use Fs or RocksDB State Backend? 
> It shouldn’t be an issue for us as long as my tests above turn out to be 
> decent. 
> 
> Thanks again guys for your advice and feedback. Really appreciate it.
> 
> — Ashish
> 
>  
>> On Jun 15, 2018, at 5:43 AM, Till Rohrmann <trohrm...@apache.org 
>> <mailto:trohrm...@apache.org>> wrote:
>> 
>> Hi,
>> 
>> ideally we would not have to cancel all tasks and only redeploy the whole 
>> job in case of a restart. Instead we should do what you've outlined: 
>> Redeploy the failed tasks and reset the state of all other running tasks. At 
>> the moment, this is, however, not yet possible. While improving Flink's 
>> recovery behavior this should be addressed eventually.
>> 
>> Just for my own understanding: What do you want to do with event-based 
>> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a 
>> partial snapshot which should then be used for recovery? Technically, this 
>> is possible but I'm not sure how much value this would add for Flink users. 
>> A partial snapshot could also be completely empty (equivalent of disabling 
>> checkpointing).
>> 
>> I can see the point of making the checkpoint triggering more flexible and 
>> giving some control to the user. In contrast to savepoints, checkpoints are 
>> considered for recovery. My question here would be, what would be the 
>> triggering condition in your case (other than time)?
>> 
>> What the community will add very soon is an atomic stop with savepoint call 
>> which will take a savepoint of your job's state when and shut it down.
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Jun 14, 2018 at 4:55 PM Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> Hi Ashish,
>> 
>> (I think) I understand your requirements and the approach of just keep 
>> non-failing tasks running is intuitively a good idea.
>> However, this is only an option for use cases that are OK with at-least-once 
>> semantics (otherwise, we'd need to reset the state of the still running 
>> tasks and hence take checkpoints). 
>> Moreover, the distributed task coordination for keeping some tasks running, 
>> restarting others, and connecting them is obviously more difficult than 
>> "just" canceling the whole job and starting it again.
>> 
>> I have to admit that I'm not that familiar with Flink's distributed task 
>> coordination. Till in CC knows much more about that.
>> However, I think the question here is, how many use cases would benefit from 
>> a recovery mode with at-most-once state guarantees and how much 
>> implementation effort would it be to support it.
>> 
>> Regarding the savepoints, if you are using the MemoryStateBackend failure at 
>> too large state size is expected since all state is replicated into the 
>> JobManager JVM. 
>> Did you try to use the FsStateBackend? It also holds the state on the 
>> TaskManager heap but backups it to a (distributed) filesystem.
>> 
>> Best, Fabian
>> 
>> 2018-06-14 4:18 GMT+02:00 Ashish Pokharel <ashish...@yahoo.com 
>> <mailto:ashish...@yahoo.com>>:
>> Hi Fabian,
>> 
>> Thanks for the prompt response and apologies for delayed response. 
>> 
>> You wrapped up the bottom lines pretty well - if I were to wrap it up I’d 
>> say “best possible” recovery on “known" restarts either say manual cancel + 
>> start OR framework initiated ones like on operator failures with these 
>> constraints 
>>  - some data loss is ok
>>  - avoid periodic checkpoints as states are really transient (less than 5 
>> seconds of lifetime if not milliseconds) and almost all events make it to 
>> state. I do understand that checkpointing performance has drastically been 
>> improved and with async and RocksDB options, it should technically not add 
>> latency in application etc. However, I feel like even with improvements and 
>> local checkpointing (which we already are doing) it is a lot of “unused” 
>> IOPS/resource utilization especially if we start to spin up more apps 
>> handling similar data sources and with similar requirements. On a first 
>> blush it feels like those resources are better utilized in cluster for apps 
>> with stricter SLAs for data loss and recovery etc instead.
>> 
>> Basically, I suppose I am thinking Checkpointing feature that is initialized 
>> by certain actions / events rather than periodic ones. Let me know I am 
>> off-base here and I should just enable checkpointing in all of these apps 
>> and move on :) 
>> 
>> I tried Savepoint again and it looks like the issue is caused by the fact 
>> that Memory states are large as it is throwing error states are larger than 
>> certain size. So solution of (1) will possibly solve (2) as well. 
>> 
>> Thanks again,
>> 
>> Ashish
>> 
>> 
>>> On Jun 7, 2018, at 4:25 PM, Fabian Hueske <fhue...@gmail.com 
>>> <mailto:fhue...@gmail.com>> wrote:
>>> 
>>> Hi Ashish,
>>> 
>>> Thanks for the great write up. 
>>> If I understood you correctly, there are two different issues that are 
>>> caused by the disabled checkpointing.
>>> 
>>> 1) Recovery from a failure without restarting all operators to preserve the 
>>> state in the running tasks
>>> 2) Planned restarts an application without losing all state (even with 
>>> disabled checkpointing).
>>> 
>>> Ad 1) The community is constantly working on reducing the time for 
>>> checkpointing and recovery. 
>>> For 1.5, local task recovery was added, which basically stores a state copy 
>>> on the local disk which is read in case of a recovery. So, tasks are 
>>> restarted but don't read the to restore state from distributed storage but 
>>> from the local disk.
>>> AFAIK, this can only be used together with remote checkpoints. I think this 
>>> might be an interesting option for you if it would be possible to write 
>>> checkpoints only to local disk and not remote storage. AFAIK, there are 
>>> also other efforts to reduce the number of restarted tasks in case of a 
>>> failure. I guess, you've played with other features such as 
>>> RocksDBStateBackend, incremental and async checkpoints already. 
>>> 
>>> Ad 2) It sounds as if savepoints are exactly the feature your are looking 
>>> for. It would be good to know what exactly did not work for you. The 
>>> MemoryStateBackend is not suitable for large state sizes because it backups 
>>> into the heap memory of the JobManager. 
>>> 
>>> Best, Fabian
>>> 
>>> 2018-06-05 21:57 GMT+02:00 ashish pok <ashish...@yahoo.com 
>>> <mailto:ashish...@yahoo.com>>:
>>> Fabian, Stephan, All,
>>> 
>>> I started a discussion a while back around having a form of event-based 
>>> checkpointing policy that will help us in some of our high volume data 
>>> pipelines. Here is an effort to put this in front of community and 
>>> understand what capabilities can support these type of use cases, how much 
>>> others feel the same need and potentially a feature that can make it to a 
>>> user story.
>>> 
>>> Use Case Summary:
>>> - Extremely high volume of data (events from consumer devices with customer 
>>> base of over 100M)
>>> - Multiple events need to be combined using a windowing streaming app 
>>> grouped by keys (something like 5 min floor of timestamp and unique 
>>> identifiers for customer devices)
>>> - "Most" events by a group/key arrive in few seconds if not milliseconds 
>>> however events can sometimes delay or get lost in transport (so delayed 
>>> event handling and timeouts will be needed)
>>> - Extremely low (pretty vague but hopefully details below clarify it more) 
>>> data loss is acceptable
>>> - Because of the volume and transient nature of source, checkpointing is 
>>> turned off (saves on writes to persistence as states/sessions are active 
>>> for only few seconds during processing)
>>> 
>>> Problem Summary:
>>> Of course, none of the above is out of the norm for Flink and as a matter 
>>> of factor we already have a Flink app doing this. The issue arises when it 
>>> comes to graceful shutdowns and on operator failures (eg: Kafka timeouts 
>>> etc.) On operator failures, entire job graph restarts which essentially 
>>> flushes out in-memory states/sessions. I think there is a feature in works 
>>> (not sure if it made it to 1.5) to perform selective restarts which will 
>>> control the damage but still will result in data loss. Also, it doesn't 
>>> help when application restarts are needed. We did try going savepoint route 
>>> for explicit restart needs but I think MemoryBackedState ran into issues 
>>> for larger states or something along those line(not certain). We obviously 
>>> cannot recover an operator that actually fails because it's own state could 
>>> be unrecoverable. However, it feels like Flink already has a lot of 
>>> plumbing to help with overall problem of allowing some sort of recoverable 
>>> state to handle graceful shutdowns and restarts with minimal data loss.
>>> 
>>> Solutions:
>>> Some in community commented on my last email with decent ideas like having 
>>> an event-based checkpointing trigger (on shutdown, on restart etc) or 
>>> life-cycle hooks (onCancel, onRestart etc) in Functions that can be 
>>> implemented if this type of behavior is needed etc. 
>>> 
>>> Appreciate feedback from community on how useful this might be for others 
>>> and from core contributors on their thoughts as well.
>>> 
>>> Thanks in advance, Ashish
>>> 
>>> 
>> 
>> 
> 

Reply via email to