If we hit the retry limit, abort the job. In our case we will restart from the last SP ( we as any production pile do it is n time s a day ) and that I would think should be OK for most folks ?
On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi <vishal.santo...@gmail.com > wrote: > Thank you for considering this. If I understand you correctly. > > * CHK pointer on ZK for a CHK state on hdfs was done successfully. > * Some issue restarted the pipeline. > * The NN was down unfortunately and flink could not retrieve the CHK > state from the CHK pointer on ZK. > > Before > > * The CHK pointer was being removed and the job started from a brand new > slate. > > After ( this fix on 1.4 +) > > * do not delete the CHK pointer ( It has to be subsumed to be deleted ). > * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit any > retry limit ) to restore state > * NN comes back > * Flink restores state on the next retry. > > I would hope that is the sequence to follow. > > Regards. > > > > > > > > > On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi Vishal, >> >> I think you might be right. We fixed the problem that checkpoints where >> dropped via https://issues.apache.org/jira/browse/FLINK-7783. However, >> we still have the problem that if the DFS is not up at all then it will >> look as if the job is starting from scratch. However, the alternative is >> failing the job, in which case you will also never be able to restore from >> a checkpoint. What do you think? >> >> Best, >> Aljoscha >> >> >> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com> wrote: >> >> Sorry for the late reply. >> >> I created FLINK-8487 [1] to track this problem >> >> @Vishal, can you have a look and check if if forgot some details? I >> logged the issue for Flink 1.3.2, is that correct? >> Please add more information if you think it is relevant. >> >> Thanks, >> Fabian >> >> [1] https://issues.apache.org/jira/browse/FLINK-8487 >> >> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >> >>> Or this one >>> >>> https://issues.apache.org/jira/browse/FLINK-4815 >>> >>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> ping. >>>> >>>> This happened again on production and it seems reasonable to abort >>>> when a checkpoint is not found rather than behave as if it is a brand new >>>> pipeline. >>>> >>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> Folks sorry for being late on this. Can some body with the knowledge >>>>> of this code base create a jira issue for the above ? We have seen this >>>>> more than once on production. >>>>> >>>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org >>>>> > wrote: >>>>> >>>>>> Hi Vishal, >>>>>> >>>>>> Some relevant Jira issues for you are: >>>>>> >>>>>> - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping >>>>>> failed checkpoints >>>>>> - https://issues.apache.org/jira/browse/FLINK-4815: Automatic >>>>>> fallback to earlier checkpoint when checkpoint restore fails >>>>>> - https://issues.apache.org/jira/browse/FLINK-7783: Don't always >>>>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover() >>>>>> >>>>>> Best, >>>>>> Aljoscha >>>>>> >>>>>> >>>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote: >>>>>> >>>>>> Hi Vishal, >>>>>> >>>>>> it would be great if you could create a JIRA ticket with Blocker >>>>>> priority. >>>>>> Please add all relevant information of your detailed analysis, add a >>>>>> link to this email thread (see [1] for the web archive of the mailing >>>>>> list), and post the id of the JIRA issue here. >>>>>> >>>>>> Thanks for looking into this! >>>>>> >>>>>> Best regards, >>>>>> Fabian >>>>>> >>>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org >>>>>> >>>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com >>>>>> >: >>>>>> >>>>>>> Thank you for confirming. >>>>>>> >>>>>>> >>>>>>> I think this is a critical bug. In essence any checkpoint store ( >>>>>>> hdfs/S3/File) will loose state if it is unavailable at resume. This >>>>>>> becomes all the more painful with your confirming that "failed >>>>>>> checkpoints killing the job" b'coz essentially it mean that if remote >>>>>>> store in unavailable during checkpoint than you have lost state ( till >>>>>>> of >>>>>>> course you have a retry of none or an unbounded retry delay, a delay >>>>>>> that >>>>>>> you *hope* the store revives in ) .. Remember the first retry >>>>>>> failure will cause new state according the code as written iff the >>>>>>> remote >>>>>>> store is down. We would rather have a configurable property that >>>>>>> establishes our desire to abort something like a >>>>>>> "abort_retry_on_chkretrevalfailure" >>>>>>> >>>>>>> >>>>>>> In our case it is very important that we do not undercount a window, >>>>>>> one reason we use flink and it's awesome failure guarantees, as various >>>>>>> alarms sound ( we do anomaly detection on the time series ). >>>>>>> >>>>>>> Please create a jira ticket for us to follow or we could do it. >>>>>>> >>>>>>> >>>>>>> PS Not aborting on checkpointing, till a configurable limit is very >>>>>>> important too. >>>>>>> >>>>>>> >>>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek < >>>>>>> aljos...@apache.org> wrote: >>>>>>> >>>>>>>> Hi Vishal, >>>>>>>> >>>>>>>> I think you're right! And thanks for looking into this so deeply. >>>>>>>> >>>>>>>> With your last mail your basically saying, that the checkpoint >>>>>>>> could not be restored because your HDFS was temporarily down. If Flink >>>>>>>> had >>>>>>>> not deleted that checkpoint it might have been possible to restore it >>>>>>>> at a >>>>>>>> later point, right? >>>>>>>> >>>>>>>> Regarding failed checkpoints killing the job: yes, this is >>>>>>>> currently the expected behaviour but there are plans to change this. >>>>>>>> >>>>>>>> Best, >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>> I think this is the offending piece. There is a catch all >>>>>>>> Exception, which IMHO should understand a recoverable exception from an >>>>>>>> unrecoverable on. >>>>>>>> >>>>>>>> >>>>>>>> try { >>>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch >>>>>>>> eckpointStateHandle); >>>>>>>> if (completedCheckpoint != null) { >>>>>>>> completedCheckpoints.add(completedCheckpoint); >>>>>>>> } >>>>>>>> } catch (Exception e) { >>>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the >>>>>>>> completed " + >>>>>>>> "checkpoint store.", e); >>>>>>>> // remove the checkpoint with broken state handle >>>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1, >>>>>>>> checkpointStateHandle.f0); >>>>>>>> } >>>>>>>> >>>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> So this is the issue and tell us that it is wrong. ZK had some >>>>>>>>> state ( backed by hdfs ) that referred to a checkpoint ( the same >>>>>>>>> exact >>>>>>>>> last successful checkpoint that was successful before NN screwed us >>>>>>>>> ). When >>>>>>>>> the JM tried to recreate the state and b'coz NN was down failed to >>>>>>>>> retrieve >>>>>>>>> the CHK handle from hdfs and conveniently ( and I think very wrongly ) >>>>>>>>> removed the CHK from being considered and cleaned the pointer ( though >>>>>>>>> failed as was NN was down and is obvious from the dangling file in >>>>>>>>> recovery >>>>>>>>> ) . The metadata itself was on hdfs and failure in retrieving should >>>>>>>>> have >>>>>>>>> been a stop all, not going to trying doing magic exception rather than >>>>>>>>> starting from a blank state. >>>>>>>>> >>>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve >>>>>>>>> checkpoint 44286 from state handle under /0000000000000044286. >>>>>>>>> This indicates that the retrieved state handle is broken. Try >>>>>>>>> cleaning the >>>>>>>>> state handle store. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Also note that the zookeeper recovery did ( sadly on the same >>>>>>>>>> hdfs cluster ) also showed the same behavior. It had the pointers to >>>>>>>>>> the >>>>>>>>>> chk point ( I think that is what it does, keeps metadata of where >>>>>>>>>> the >>>>>>>>>> checkpoint etc ) . It too decided to keep the recovery file from >>>>>>>>>> the >>>>>>>>>> failed state. >>>>>>>>>> >>>>>>>>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >>>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >>>>>>>>>> >>>>>>>>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >>>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092 >>>>>>>>>> >>>>>>>>>> This is getting a little interesting. What say you :) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Another thing I noted was this thing >>>>>>>>>>> >>>>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk >>>>>>>>>>> -44286 >>>>>>>>>>> >>>>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk >>>>>>>>>>> -45428 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Generally what Flink does IMHO is that it replaces the chk point >>>>>>>>>>> directory with a new one. I see it happening now. Every minute it >>>>>>>>>>> replaces >>>>>>>>>>> the old directory. In this job's case however, it did not delete >>>>>>>>>>> the >>>>>>>>>>> 2017-10-04 13:54 and hence the chk-44286 directory. This was the >>>>>>>>>>> last >>>>>>>>>>> chk-44286 ( I think ) successfully created before NN had issues >>>>>>>>>>> but as >>>>>>>>>>> is usual did not delete this chk-44286. It looks as if it started >>>>>>>>>>> with a >>>>>>>>>>> blank slate ???????? Does this strike a chord ????? >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello Fabian, >>>>>>>>>>>> First of all congratulations on this >>>>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has some >>>>>>>>>>>> natural >>>>>>>>>>>> pluses Flink's state management is far more advanced. With kafka >>>>>>>>>>>> as a >>>>>>>>>>>> source it negates issues GDF has ( GDF integration with pub/sub is >>>>>>>>>>>> organic >>>>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue with >>>>>>>>>>>> windows on >>>>>>>>>>>> event time etc ) >>>>>>>>>>>> >>>>>>>>>>>> Coming back to this issue. We have that same >>>>>>>>>>>> kafka topic feeding a streaming druid datasource and we do not see >>>>>>>>>>>> any >>>>>>>>>>>> issue there, so so data loss on the source, kafka is not >>>>>>>>>>>> applicable. I am >>>>>>>>>>>> totally certain that the "retention" time was not an issue. It >>>>>>>>>>>> is 4 days of retention and we fixed this issue within 30 minutes. >>>>>>>>>>>> We could >>>>>>>>>>>> replay kafka with a new consumer group.id and that worked >>>>>>>>>>>> fine. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Note these properties and see if they strike a chord. >>>>>>>>>>>> >>>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka >>>>>>>>>>>> consumers is the default true. I bring this up to see whether >>>>>>>>>>>> flink will in >>>>>>>>>>>> any circumstance drive consumption on the kafka perceived offset >>>>>>>>>>>> rather >>>>>>>>>>>> than the one in the checkpoint. >>>>>>>>>>>> >>>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set. >>>>>>>>>>>> The state is big enough though therefore IMHO no way the state is >>>>>>>>>>>> stored >>>>>>>>>>>> along with the meta data in JM ( or ZK ? ) . The reason I bring >>>>>>>>>>>> this up is >>>>>>>>>>>> to make sure when you say that the size has to be less than >>>>>>>>>>>> 1024bytes , you >>>>>>>>>>>> are talking about cumulative state of the pipeine. >>>>>>>>>>>> >>>>>>>>>>>> * We have a good sense of SP ( save point ) and CP ( >>>>>>>>>>>> checkpoint ) and certainly understand that they actually are not >>>>>>>>>>>> dissimilar. However in this case there were multiple attempts to >>>>>>>>>>>> restart >>>>>>>>>>>> the pipe before it finally succeeded. >>>>>>>>>>>> >>>>>>>>>>>> * Other hdfs related poperties. >>>>>>>>>>>> >>>>>>>>>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >>>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>>> >>>>>>>>>>>> state.savepoints.dir: hdfs:///flink-savepoints/<%= >>>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>>> >>>>>>>>>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= >>>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Do these make sense ? Is there anything else I should look at. >>>>>>>>>>>> Please also note that it is the second time this has happened. The >>>>>>>>>>>> first >>>>>>>>>>>> time I was vacationing and was not privy to the state of the flink >>>>>>>>>>>> pipeline, but the net effect were similar. The counts for the >>>>>>>>>>>> first window >>>>>>>>>>>> after an internal restart dropped. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thank you for you patience and regards, >>>>>>>>>>>> >>>>>>>>>>>> Vishal >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske < >>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Vishal, >>>>>>>>>>>>> >>>>>>>>>>>>> window operators are always stateful because the operator >>>>>>>>>>>>> needs to remember previously received events (WindowFunction) or >>>>>>>>>>>>> intermediate results (ReduceFunction). >>>>>>>>>>>>> Given the program you described, a checkpoint should include >>>>>>>>>>>>> the Kafka consumer offset and the state of the window operator. >>>>>>>>>>>>> If the >>>>>>>>>>>>> program eventually successfully (i.e., without an error) >>>>>>>>>>>>> recovered from the >>>>>>>>>>>>> last checkpoint, all its state should have been restored. Since >>>>>>>>>>>>> the last >>>>>>>>>>>>> checkpoint was before HDFS went into safe mode, the program would >>>>>>>>>>>>> have been >>>>>>>>>>>>> reset to that point. If the Kafka retention time is less than the >>>>>>>>>>>>> time it >>>>>>>>>>>>> took to fix HDFS you would have lost data because it would have >>>>>>>>>>>>> been >>>>>>>>>>>>> removed from Kafka. If that's not the case, we need to >>>>>>>>>>>>> investigate this >>>>>>>>>>>>> further because a checkpoint recovery must not result in state >>>>>>>>>>>>> loss. >>>>>>>>>>>>> >>>>>>>>>>>>> Restoring from a savepoint is not so much different from >>>>>>>>>>>>> automatic checkpoint recovery. Given that you have a completed >>>>>>>>>>>>> savepoint, >>>>>>>>>>>>> you can restart the job from that point. The main difference is >>>>>>>>>>>>> that >>>>>>>>>>>>> checkpoints are only used for internal recovery and usually >>>>>>>>>>>>> discarded once >>>>>>>>>>>>> the job is terminated while savepoints are retained. >>>>>>>>>>>>> >>>>>>>>>>>>> Regarding your question if a failed checkpoint should cause >>>>>>>>>>>>> the job to fail and recover I'm not sure what the current status >>>>>>>>>>>>> is. >>>>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>> >>>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi < >>>>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>>>> >>>>>>>>>>>>>> To add to it, my pipeline is a simple >>>>>>>>>>>>>> >>>>>>>>>>>>>> keyBy(0) >>>>>>>>>>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>>>>>>>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>>>>>>>>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello folks, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> As far as I know checkpoint failure should be ignored and >>>>>>>>>>>>>>> retried with potentially larger state. I had this situation >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>>>>>>>>>>>> * exception was thrown >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache. >>>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE is >>>>>>>>>>>>>>> not supported in state standby. Visit >>>>>>>>>>>>>>> https://s.apache.org/sbnn-error >>>>>>>>>>>>>>> .................. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> at org.apache.flink.runtime.fs.hd >>>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) >>>>>>>>>>>>>>> at org.apache.flink.core.fs.Safet >>>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java >>>>>>>>>>>>>>> :111) >>>>>>>>>>>>>>> at org.apache.flink.runtime.state.filesystem. >>>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck >>>>>>>>>>>>>>> pointStreamFactory.java:132) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> * The pipeline came back after a few restarts and checkpoint >>>>>>>>>>>>>>> failures, after the hdfs issues were resolved. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I would not have worried about the restart, but it was >>>>>>>>>>>>>>> evident that I lost my operator state. Either it was my kafka >>>>>>>>>>>>>>> consumer that >>>>>>>>>>>>>>> kept on advancing it's offset between a start and the next >>>>>>>>>>>>>>> checkpoint >>>>>>>>>>>>>>> failure ( a minute's worth ) or the the operator that had >>>>>>>>>>>>>>> partial >>>>>>>>>>>>>>> aggregates was lost. I have a 15 minute window of counts on a >>>>>>>>>>>>>>> keyed operator >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned >>>>>>>>>>>>>>> on. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The questions thus are >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>>>>>>>>>>>> * Why on restart did the operator state did not recreate ? >>>>>>>>>>>>>>> * Is the nature of the exception thrown have to do with any >>>>>>>>>>>>>>> of this b'coz suspend and resume from a save point work as >>>>>>>>>>>>>>> expected ? >>>>>>>>>>>>>>> * And though I am pretty sure, are operators like the Window >>>>>>>>>>>>>>> operator stateful by drfault and thus if I have >>>>>>>>>>>>>>> timeWindow(Time.of(window_ >>>>>>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new >>>>>>>>>>>>>>> WindowFunction()), the state is managed by flink ? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >> >