Wait a sec, I just checked out the code again and it seems we already do that: https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210 <https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210>
If there were some checkpoints but none could be read we fail recovery. > On 24. Jan 2018, at 11:32, Aljoscha Krettek <aljos...@apache.org> wrote: > > That sounds reasonable: We would keep the first fix, i.e. never delete > checkpoints if they're "corrupt", only when they're subsumed. Additionally, > we fail the job if there are some checkpoints in ZooKeeper but none of them > can be restored to prevent the case where a job starts from scratch even > though it shouldn't. > > Does that sum it up? > >> On 24. Jan 2018, at 01:19, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> >> If we hit the retry limit, abort the job. In our case we will restart from >> the last SP ( we as any production pile do it is n time s a day ) and that >> I would think should be OK for most folks ? >> >> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> Thank you for considering this. If I understand you correctly. >> >> * CHK pointer on ZK for a CHK state on hdfs was done successfully. >> * Some issue restarted the pipeline. >> * The NN was down unfortunately and flink could not retrieve the CHK state >> from the CHK pointer on ZK. >> >> Before >> >> * The CHK pointer was being removed and the job started from a brand new >> slate. >> >> After ( this fix on 1.4 +) >> >> * do not delete the CHK pointer ( It has to be subsumed to be deleted ). >> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit any >> retry limit ) to restore state >> * NN comes back >> * Flink restores state on the next retry. >> >> I would hope that is the sequence to follow. >> >> Regards. >> >> >> >> >> >> >> >> >> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljos...@apache.org >> <mailto:aljos...@apache.org>> wrote: >> Hi Vishal, >> >> I think you might be right. We fixed the problem that checkpoints where >> dropped via https://issues.apache.org/jira/browse/FLINK-7783 >> <https://issues.apache.org/jira/browse/FLINK-7783>. However, we still have >> the problem that if the DFS is not up at all then it will look as if the job >> is starting from scratch. However, the alternative is failing the job, in >> which case you will also never be able to restore from a checkpoint. What do >> you think? >> >> Best, >> Aljoscha >> >> >>> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com >>> <mailto:fhue...@gmail.com>> wrote: >>> >>> Sorry for the late reply. >>> >>> I created FLINK-8487 [1] to track this problem >>> >>> @Vishal, can you have a look and check if if forgot some details? I logged >>> the issue for Flink 1.3.2, is that correct? >>> Please add more information if you think it is relevant. >>> >>> Thanks, >>> Fabian >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-8487 >>> <https://issues.apache.org/jira/browse/FLINK-8487> >>> >>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com >>> <mailto:vishal.santo...@gmail.com>>: >>> Or this one >>> >>> https://issues.apache.org/jira/browse/FLINK-4815 >>> <https://issues.apache.org/jira/browse/FLINK-4815> >>> >>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <vishal.santo...@gmail.com >>> <mailto:vishal.santo...@gmail.com>> wrote: >>> ping. >>> >>> This happened again on production and it seems reasonable to abort when >>> a checkpoint is not found rather than behave as if it is a brand new >>> pipeline. >>> >>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <vishal.santo...@gmail.com >>> <mailto:vishal.santo...@gmail.com>> wrote: >>> Folks sorry for being late on this. Can some body with the knowledge of >>> this code base create a jira issue for the above ? We have seen this more >>> than once on production. >>> >>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org >>> <mailto:aljos...@apache.org>> wrote: >>> Hi Vishal, >>> >>> Some relevant Jira issues for you are: >>> >>> - https://issues.apache.org/jira/browse/FLINK-4808: >>> <https://issues.apache.org/jira/browse/FLINK-4808:> Allow skipping failed >>> checkpoints >>> - https://issues.apache.org/jira/browse/FLINK-4815: >>> <https://issues.apache.org/jira/browse/FLINK-4815:> Automatic fallback to >>> earlier checkpoint when checkpoint restore fails >>> - https://issues.apache.org/jira/browse/FLINK-7783: >>> <https://issues.apache.org/jira/browse/FLINK-7783:> Don't always remove >>> checkpoints in ZooKeeperCompletedCheckpointStore#recover() >>> >>> Best, >>> Aljoscha >>> >>> >>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com >>>> <mailto:fhue...@gmail.com>> wrote: >>>> >>>> Hi Vishal, >>>> >>>> it would be great if you could create a JIRA ticket with Blocker priority. >>>> Please add all relevant information of your detailed analysis, add a link >>>> to this email thread (see [1] for the web archive of the mailing list), >>>> and post the id of the JIRA issue here. >>>> >>>> Thanks for looking into this! >>>> >>>> Best regards, >>>> Fabian >>>> >>>> [1] https://lists.apache.org/list.html?user@flink.apache.org >>>> <https://lists.apache.org/list.html?user@flink.apache.org> >>>> >>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com >>>> <mailto:vishal.santo...@gmail.com>>: >>>> Thank you for confirming. >>>> >>>> >>>> I think this is a critical bug. In essence any checkpoint store ( >>>> hdfs/S3/File) will loose state if it is unavailable at resume. This >>>> becomes all the more painful with your confirming that "failed >>>> checkpoints killing the job" b'coz essentially it mean that if remote >>>> store in unavailable during checkpoint than you have lost state ( till of >>>> course you have a retry of none or an unbounded retry delay, a delay that >>>> you hope the store revives in ) .. Remember the first retry failure will >>>> cause new state according the code as written iff the remote store is >>>> down. We would rather have a configurable property that establishes our >>>> desire to abort something like a "abort_retry_on_chkretrevalfailure" >>>> >>>> >>>> In our case it is very important that we do not undercount a window, one >>>> reason we use flink and it's awesome failure guarantees, as various alarms >>>> sound ( we do anomaly detection on the time series ). >>>> >>>> Please create a jira ticket for us to follow or we could do it. >>>> >>>> >>>> PS Not aborting on checkpointing, till a configurable limit is very >>>> important too. >>>> >>>> >>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org >>>> <mailto:aljos...@apache.org>> wrote: >>>> Hi Vishal, >>>> >>>> I think you're right! And thanks for looking into this so deeply. >>>> >>>> With your last mail your basically saying, that the checkpoint could not >>>> be restored because your HDFS was temporarily down. If Flink had not >>>> deleted that checkpoint it might have been possible to restore it at a >>>> later point, right? >>>> >>>> Regarding failed checkpoints killing the job: yes, this is currently the >>>> expected behaviour but there are plans to change this. >>>> >>>> Best, >>>> Aljoscha >>>> >>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com >>>>> <mailto:vishal.santo...@gmail.com>> wrote: >>>>> >>>>> I think this is the offending piece. There is a catch all Exception, >>>>> which IMHO should understand a recoverable exception from an >>>>> unrecoverable on. >>>>> >>>>> >>>>> try { >>>>> completedCheckpoint = >>>>> retrieveCompletedCheckpoint(checkpointStateHandle); >>>>> if (completedCheckpoint != null) { >>>>> >>>>> completedCheckpoints.add(completedCheckpoint); >>>>> } >>>>> } catch (Exception e) { >>>>> LOG.warn("Could not retrieve checkpoint. >>>>> Removing it from the completed " + >>>>> "checkpoint store.", e); >>>>> >>>>> // remove the checkpoint with broken state >>>>> handle >>>>> >>>>> removeBrokenStateHandle(checkpointStateHandle.f1, >>>>> checkpointStateHandle.f0); >>>>> } >>>>> >>>>> >>>>> >>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi >>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>>>> So this is the issue and tell us that it is wrong. ZK had some state ( >>>>> backed by hdfs ) that referred to a checkpoint ( the same exact last >>>>> successful checkpoint that was successful before NN screwed us ). When >>>>> the JM tried to recreate the state and b'coz NN was down failed to >>>>> retrieve the CHK handle from hdfs and conveniently ( and I think very >>>>> wrongly ) removed the CHK from being considered and cleaned the pointer ( >>>>> though failed as was NN was down and is obvious from the dangling file in >>>>> recovery ) . The metadata itself was on hdfs and failure in retrieving >>>>> should have been a stop all, not going to trying doing magic exception >>>>> rather than starting from a blank state. >>>>> >>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint 44286 >>>>> from state handle under /0000000000000044286. This indicates that the >>>>> retrieved state handle is broken. Try cleaning the state handle store. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi >>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>>>> Also note that the zookeeper recovery did ( sadly on the same hdfs >>>>> cluster ) also showed the same behavior. It had the pointers to the chk >>>>> point ( I think that is what it does, keeps metadata of where the >>>>> checkpoint etc ) . It too decided to keep the recovery file from the >>>>> failed state. >>>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >>>>> >>>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092 >>>>> >>>>> This is getting a little interesting. What say you :) >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi >>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>>>> Another thing I noted was this thing >>>>> >>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286 >>>>> >>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428 >>>>> >>>>> >>>>> >>>>> Generally what Flink does IMHO is that it replaces the chk point >>>>> directory with a new one. I see it happening now. Every minute it >>>>> replaces the old directory. In this job's case however, it did not >>>>> delete the 2017-10-04 13:54 and hence the chk-44286 directory. This was >>>>> the last chk-44286 ( I think ) successfully created before NN had >>>>> issues but as is usual did not delete this chk-44286. It looks as if it >>>>> started with a blank slate ???????? Does this strike a chord ????? >>>>> >>>>> >>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi >>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>>>> Hello Fabian, >>>>> First of all congratulations on this fabulous >>>>> framework. I have worked with GDF and though GDF has some natural pluses >>>>> Flink's state management is far more advanced. With kafka as a source it >>>>> negates issues GDF has ( GDF integration with pub/sub is organic and that >>>>> is to be expected but non FIFO pub/sub is an issue with windows on event >>>>> time etc ) >>>>> >>>>> Coming back to this issue. We have that same kafka >>>>> topic feeding a streaming druid datasource and we do not see any issue >>>>> there, so so data loss on the source, kafka is not applicable. I am >>>>> totally certain that the "retention" time was not an issue. It is 4 days >>>>> of retention and we fixed this issue within 30 minutes. We could replay >>>>> kafka with a new consumer group.id <http://group.id/> and that worked >>>>> fine. >>>>> >>>>> >>>>> Note these properties and see if they strike a chord. >>>>> >>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is the >>>>> default true. I bring this up to see whether flink will in any >>>>> circumstance drive consumption on the kafka perceived offset rather than >>>>> the one in the checkpoint. >>>>> >>>>> * The state.backend.fs.memory-threshold: 0 has not been set. The state >>>>> is big enough though therefore IMHO no way the state is stored along with >>>>> the meta data in JM ( or ZK ? ) . The reason I bring this up is to make >>>>> sure when you say that the size has to be less than 1024bytes , you are >>>>> talking about cumulative state of the pipeine. >>>>> >>>>> * We have a good sense of SP ( save point ) and CP ( checkpoint ) and >>>>> certainly understand that they actually are not dissimilar. However in >>>>> this case there were multiple attempts to restart the pipe before it >>>>> finally succeeded. >>>>> >>>>> * Other hdfs related poperties. >>>>> >>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints <>/<%= >>>>> flink_hdfs_root %> >>>>> state.savepoints.dir: hdfs:///flink-savepoints <>/<%= flink_hdfs_root %> >>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery <>/<%= >>>>> flink_hdfs_root %> >>>>> >>>>> >>>>> Do these make sense ? Is there anything else I should look at. Please >>>>> also note that it is the second time this has happened. The first time I >>>>> was vacationing and was not privy to the state of the flink pipeline, but >>>>> the net effect were similar. The counts for the first window after an >>>>> internal restart dropped. >>>>> >>>>> >>>>> >>>>> >>>>> Thank you for you patience and regards, >>>>> >>>>> Vishal >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com >>>>> <mailto:fhue...@gmail.com>> wrote: >>>>> Hi Vishal, >>>>> >>>>> window operators are always stateful because the operator needs to >>>>> remember previously received events (WindowFunction) or intermediate >>>>> results (ReduceFunction). >>>>> Given the program you described, a checkpoint should include the Kafka >>>>> consumer offset and the state of the window operator. If the program >>>>> eventually successfully (i.e., without an error) recovered from the last >>>>> checkpoint, all its state should have been restored. Since the last >>>>> checkpoint was before HDFS went into safe mode, the program would have >>>>> been reset to that point. If the Kafka retention time is less than the >>>>> time it took to fix HDFS you would have lost data because it would have >>>>> been removed from Kafka. If that's not the case, we need to investigate >>>>> this further because a checkpoint recovery must not result in state loss. >>>>> >>>>> Restoring from a savepoint is not so much different from automatic >>>>> checkpoint recovery. Given that you have a completed savepoint, you can >>>>> restart the job from that point. The main difference is that checkpoints >>>>> are only used for internal recovery and usually discarded once the job is >>>>> terminated while savepoints are retained. >>>>> >>>>> Regarding your question if a failed checkpoint should cause the job to >>>>> fail and recover I'm not sure what the current status is. >>>>> Stefan (in CC) should know what happens if a checkpoint fails. >>>>> >>>>> Best, Fabian >>>>> >>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com >>>>> <mailto:vishal.santo...@gmail.com>>: >>>>> To add to it, my pipeline is a simple >>>>> >>>>> keyBy(0) >>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>> >>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi >>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote: >>>>> Hello folks, >>>>> >>>>> As far as I know checkpoint failure should be ignored and retried with >>>>> potentially larger state. I had this situation >>>>> >>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>> * exception was thrown >>>>> >>>>> >>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >>>>> Operation category WRITE is not supported in state standby. Visit >>>>> https://s.apache.org/sbnn-error <https://s.apache.org/sbnn-error> >>>>> .................. >>>>> >>>>> at >>>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) >>>>> at >>>>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) >>>>> at >>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) >>>>> >>>>> * The pipeline came back after a few restarts and checkpoint failures, >>>>> after the hdfs issues were resolved. >>>>> >>>>> I would not have worried about the restart, but it was evident that I >>>>> lost my operator state. Either it was my kafka consumer that kept on >>>>> advancing it's offset between a start and the next checkpoint failure ( a >>>>> minute's worth ) or the the operator that had partial aggregates was >>>>> lost. I have a 15 minute window of counts on a keyed operator >>>>> >>>>> I am using ROCKS DB and of course have checkpointing turned on. >>>>> >>>>> The questions thus are >>>>> >>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>> * Why on restart did the operator state did not recreate ? >>>>> * Is the nature of the exception thrown have to do with any of this b'coz >>>>> suspend and resume from a save point work as expected ? >>>>> * And though I am pretty sure, are operators like the Window operator >>>>> stateful by drfault and thus if I have timeWindow(Time.of(window_size, >>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), >>>>> the state is managed by flink ? >>>>> >>>>> Thanks. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>>> >>>> >>> >>> >>> >>> >>> >> >> >> >