Sorry for the late reply. I created FLINK-8487 [1] to track this problem
@Vishal, can you have a look and check if if forgot some details? I logged the issue for Flink 1.3.2, is that correct? Please add more information if you think it is relevant. Thanks, Fabian [1] https://issues.apache.org/jira/browse/FLINK-8487 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > Or this one > > https://issues.apache.org/jira/browse/FLINK-4815 > > On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> ping. >> >> This happened again on production and it seems reasonable to abort >> when a checkpoint is not found rather than behave as if it is a brand new >> pipeline. >> >> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Folks sorry for being late on this. Can some body with the knowledge of >>> this code base create a jira issue for the above ? We have seen this more >>> than once on production. >>> >>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi Vishal, >>>> >>>> Some relevant Jira issues for you are: >>>> >>>> - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping >>>> failed checkpoints >>>> - https://issues.apache.org/jira/browse/FLINK-4815: Automatic >>>> fallback to earlier checkpoint when checkpoint restore fails >>>> - https://issues.apache.org/jira/browse/FLINK-7783: Don't always >>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover() >>>> >>>> Best, >>>> Aljoscha >>>> >>>> >>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote: >>>> >>>> Hi Vishal, >>>> >>>> it would be great if you could create a JIRA ticket with Blocker >>>> priority. >>>> Please add all relevant information of your detailed analysis, add a >>>> link to this email thread (see [1] for the web archive of the mailing >>>> list), and post the id of the JIRA issue here. >>>> >>>> Thanks for looking into this! >>>> >>>> Best regards, >>>> Fabian >>>> >>>> [1] https://lists.apache.org/list.html?user@flink.apache.org >>>> >>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>>> >>>>> Thank you for confirming. >>>>> >>>>> >>>>> I think this is a critical bug. In essence any checkpoint store ( >>>>> hdfs/S3/File) will loose state if it is unavailable at resume. This >>>>> becomes all the more painful with your confirming that "failed >>>>> checkpoints killing the job" b'coz essentially it mean that if remote >>>>> store in unavailable during checkpoint than you have lost state ( till of >>>>> course you have a retry of none or an unbounded retry delay, a delay that >>>>> you *hope* the store revives in ) .. Remember the first retry >>>>> failure will cause new state according the code as written iff the remote >>>>> store is down. We would rather have a configurable property that >>>>> establishes our desire to abort something like a >>>>> "abort_retry_on_chkretrevalfailure" >>>>> >>>>> >>>>> In our case it is very important that we do not undercount a window, >>>>> one reason we use flink and it's awesome failure guarantees, as various >>>>> alarms sound ( we do anomaly detection on the time series ). >>>>> >>>>> Please create a jira ticket for us to follow or we could do it. >>>>> >>>>> >>>>> PS Not aborting on checkpointing, till a configurable limit is very >>>>> important too. >>>>> >>>>> >>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Vishal, >>>>>> >>>>>> I think you're right! And thanks for looking into this so deeply. >>>>>> >>>>>> With your last mail your basically saying, that the checkpoint could >>>>>> not be restored because your HDFS was temporarily down. If Flink had not >>>>>> deleted that checkpoint it might have been possible to restore it at a >>>>>> later point, right? >>>>>> >>>>>> Regarding failed checkpoints killing the job: yes, this is currently >>>>>> the expected behaviour but there are plans to change this. >>>>>> >>>>>> Best, >>>>>> Aljoscha >>>>>> >>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> I think this is the offending piece. There is a catch all Exception, >>>>>> which IMHO should understand a recoverable exception from an >>>>>> unrecoverable >>>>>> on. >>>>>> >>>>>> >>>>>> try { >>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch >>>>>> eckpointStateHandle); >>>>>> if (completedCheckpoint != null) { >>>>>> completedCheckpoints.add(completedCheckpoint); >>>>>> } >>>>>> } catch (Exception e) { >>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the >>>>>> completed " + >>>>>> "checkpoint store.", e); >>>>>> // remove the checkpoint with broken state handle >>>>>> removeBrokenStateHandle(checkpointStateHandle.f1, >>>>>> checkpointStateHandle.f0); >>>>>> } >>>>>> >>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> So this is the issue and tell us that it is wrong. ZK had some state >>>>>>> ( backed by hdfs ) that referred to a checkpoint ( the same exact last >>>>>>> successful checkpoint that was successful before NN screwed us ). When >>>>>>> the >>>>>>> JM tried to recreate the state and b'coz NN was down failed to retrieve >>>>>>> the >>>>>>> CHK handle from hdfs and conveniently ( and I think very wrongly ) >>>>>>> removed >>>>>>> the CHK from being considered and cleaned the pointer ( though failed as >>>>>>> was NN was down and is obvious from the dangling file in recovery ) . >>>>>>> The >>>>>>> metadata itself was on hdfs and failure in retrieving should have been a >>>>>>> stop all, not going to trying doing magic exception rather than starting >>>>>>> from a blank state. >>>>>>> >>>>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint >>>>>>> 44286 from state handle under /0000000000000044286. This indicates that >>>>>>> the >>>>>>> retrieved state handle is broken. Try cleaning the state handle store. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> Also note that the zookeeper recovery did ( sadly on the same >>>>>>>> hdfs cluster ) also showed the same behavior. It had the pointers to >>>>>>>> the >>>>>>>> chk point ( I think that is what it does, keeps metadata of where the >>>>>>>> checkpoint etc ) . It too decided to keep the recovery file from the >>>>>>>> failed state. >>>>>>>> >>>>>>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >>>>>>>> >>>>>>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092 >>>>>>>> >>>>>>>> This is getting a little interesting. What say you :) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Another thing I noted was this thing >>>>>>>>> >>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286 >>>>>>>>> >>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428 >>>>>>>>> >>>>>>>>> >>>>>>>>> Generally what Flink does IMHO is that it replaces the chk point >>>>>>>>> directory with a new one. I see it happening now. Every minute it >>>>>>>>> replaces >>>>>>>>> the old directory. In this job's case however, it did not delete the >>>>>>>>> 2017-10-04 13:54 and hence the chk-44286 directory. This was the >>>>>>>>> last >>>>>>>>> chk-44286 ( I think ) successfully created before NN had issues >>>>>>>>> but as >>>>>>>>> is usual did not delete this chk-44286. It looks as if it started >>>>>>>>> with a >>>>>>>>> blank slate ???????? Does this strike a chord ????? >>>>>>>>> >>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hello Fabian, >>>>>>>>>> First of all congratulations on this >>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has some >>>>>>>>>> natural >>>>>>>>>> pluses Flink's state management is far more advanced. With kafka as a >>>>>>>>>> source it negates issues GDF has ( GDF integration with pub/sub is >>>>>>>>>> organic >>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue with >>>>>>>>>> windows on >>>>>>>>>> event time etc ) >>>>>>>>>> >>>>>>>>>> Coming back to this issue. We have that same >>>>>>>>>> kafka topic feeding a streaming druid datasource and we do not see >>>>>>>>>> any >>>>>>>>>> issue there, so so data loss on the source, kafka is not applicable. >>>>>>>>>> I am >>>>>>>>>> totally certain that the "retention" time was not an issue. It >>>>>>>>>> is 4 days of retention and we fixed this issue within 30 minutes. We >>>>>>>>>> could >>>>>>>>>> replay kafka with a new consumer group.id and that worked fine. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Note these properties and see if they strike a chord. >>>>>>>>>> >>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers >>>>>>>>>> is the default true. I bring this up to see whether flink will in any >>>>>>>>>> circumstance drive consumption on the kafka perceived offset rather >>>>>>>>>> than >>>>>>>>>> the one in the checkpoint. >>>>>>>>>> >>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set. >>>>>>>>>> The state is big enough though therefore IMHO no way the state is >>>>>>>>>> stored >>>>>>>>>> along with the meta data in JM ( or ZK ? ) . The reason I bring this >>>>>>>>>> up is >>>>>>>>>> to make sure when you say that the size has to be less than >>>>>>>>>> 1024bytes , you >>>>>>>>>> are talking about cumulative state of the pipeine. >>>>>>>>>> >>>>>>>>>> * We have a good sense of SP ( save point ) and CP ( checkpoint >>>>>>>>>> ) and certainly understand that they actually are not dissimilar. >>>>>>>>>> However >>>>>>>>>> in this case there were multiple attempts to restart the pipe before >>>>>>>>>> it >>>>>>>>>> finally succeeded. >>>>>>>>>> >>>>>>>>>> * Other hdfs related poperties. >>>>>>>>>> >>>>>>>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >>>>>>>>>> flink_hdfs_root %> >>>>>>>>>> >>>>>>>>>> state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root >>>>>>>>>> %> >>>>>>>>>> >>>>>>>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= >>>>>>>>>> flink_hdfs_root %> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Do these make sense ? Is there anything else I should look at. >>>>>>>>>> Please also note that it is the second time this has happened. The >>>>>>>>>> first >>>>>>>>>> time I was vacationing and was not privy to the state of the flink >>>>>>>>>> pipeline, but the net effect were similar. The counts for the first >>>>>>>>>> window >>>>>>>>>> after an internal restart dropped. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thank you for you patience and regards, >>>>>>>>>> >>>>>>>>>> Vishal >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Vishal, >>>>>>>>>>> >>>>>>>>>>> window operators are always stateful because the operator needs >>>>>>>>>>> to remember previously received events (WindowFunction) or >>>>>>>>>>> intermediate >>>>>>>>>>> results (ReduceFunction). >>>>>>>>>>> Given the program you described, a checkpoint should include the >>>>>>>>>>> Kafka consumer offset and the state of the window operator. If the >>>>>>>>>>> program >>>>>>>>>>> eventually successfully (i.e., without an error) recovered from the >>>>>>>>>>> last >>>>>>>>>>> checkpoint, all its state should have been restored. Since the last >>>>>>>>>>> checkpoint was before HDFS went into safe mode, the program would >>>>>>>>>>> have been >>>>>>>>>>> reset to that point. If the Kafka retention time is less than the >>>>>>>>>>> time it >>>>>>>>>>> took to fix HDFS you would have lost data because it would have been >>>>>>>>>>> removed from Kafka. If that's not the case, we need to investigate >>>>>>>>>>> this >>>>>>>>>>> further because a checkpoint recovery must not result in state loss. >>>>>>>>>>> >>>>>>>>>>> Restoring from a savepoint is not so much different from >>>>>>>>>>> automatic checkpoint recovery. Given that you have a completed >>>>>>>>>>> savepoint, >>>>>>>>>>> you can restart the job from that point. The main difference is that >>>>>>>>>>> checkpoints are only used for internal recovery and usually >>>>>>>>>>> discarded once >>>>>>>>>>> the job is terminated while savepoints are retained. >>>>>>>>>>> >>>>>>>>>>> Regarding your question if a failed checkpoint should cause the >>>>>>>>>>> job to fail and recover I'm not sure what the current status is. >>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails. >>>>>>>>>>> >>>>>>>>>>> Best, Fabian >>>>>>>>>>> >>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>> >>>>>>>>>>>> To add to it, my pipeline is a simple >>>>>>>>>>>> >>>>>>>>>>>> keyBy(0) >>>>>>>>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>>>>>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>>>>>>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello folks, >>>>>>>>>>>>> >>>>>>>>>>>>> As far as I know checkpoint failure should be ignored and >>>>>>>>>>>>> retried with potentially larger state. I had this situation >>>>>>>>>>>>> >>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>>>>>>>>>> * exception was thrown >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache. >>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE is not >>>>>>>>>>>>> supported in state standby. Visit https://s.apache.org/sbn >>>>>>>>>>>>> n-error >>>>>>>>>>>>> .................. >>>>>>>>>>>>> >>>>>>>>>>>>> at org.apache.flink.runtime.fs.hd >>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) >>>>>>>>>>>>> at org.apache.flink.core.fs.Safet >>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java >>>>>>>>>>>>> :111) >>>>>>>>>>>>> at org.apache.flink.runtime.state.filesystem. >>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck >>>>>>>>>>>>> pointStreamFactory.java:132) >>>>>>>>>>>>> >>>>>>>>>>>>> * The pipeline came back after a few restarts and checkpoint >>>>>>>>>>>>> failures, after the hdfs issues were resolved. >>>>>>>>>>>>> >>>>>>>>>>>>> I would not have worried about the restart, but it was evident >>>>>>>>>>>>> that I lost my operator state. Either it was my kafka consumer >>>>>>>>>>>>> that kept on >>>>>>>>>>>>> advancing it's offset between a start and the next checkpoint >>>>>>>>>>>>> failure ( a >>>>>>>>>>>>> minute's worth ) or the the operator that had partial aggregates >>>>>>>>>>>>> was lost. >>>>>>>>>>>>> I have a 15 minute window of counts on a keyed operator >>>>>>>>>>>>> >>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned on. >>>>>>>>>>>>> >>>>>>>>>>>>> The questions thus are >>>>>>>>>>>>> >>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>>>>>>>>>> * Why on restart did the operator state did not recreate ? >>>>>>>>>>>>> * Is the nature of the exception thrown have to do with any of >>>>>>>>>>>>> this b'coz suspend and resume from a save point work as expected ? >>>>>>>>>>>>> * And though I am pretty sure, are operators like the Window >>>>>>>>>>>>> operator stateful by drfault and thus if I have >>>>>>>>>>>>> timeWindow(Time.of(window_ >>>>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new >>>>>>>>>>>>> WindowFunction()), the state is managed by flink ? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >