ping. This happened again on production and it seems reasonable to abort when a checkpoint is not found rather than behave as if it is a brand new pipeline.
On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Folks sorry for being late on this. Can some body with the knowledge of > this code base create a jira issue for the above ? We have seen this more > than once on production. > > On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi Vishal, >> >> Some relevant Jira issues for you are: >> >> - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping >> failed checkpoints >> - https://issues.apache.org/jira/browse/FLINK-4815: Automatic fallback >> to earlier checkpoint when checkpoint restore fails >> - https://issues.apache.org/jira/browse/FLINK-7783: Don't always remove >> checkpoints in ZooKeeperCompletedCheckpointStore#recover() >> >> Best, >> Aljoscha >> >> >> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote: >> >> Hi Vishal, >> >> it would be great if you could create a JIRA ticket with Blocker priority. >> Please add all relevant information of your detailed analysis, add a link >> to this email thread (see [1] for the web archive of the mailing list), and >> post the id of the JIRA issue here. >> >> Thanks for looking into this! >> >> Best regards, >> Fabian >> >> [1] https://lists.apache.org/list.html?user@flink.apache.org >> >> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>: >> >>> Thank you for confirming. >>> >>> >>> I think this is a critical bug. In essence any checkpoint store ( >>> hdfs/S3/File) will loose state if it is unavailable at resume. This >>> becomes all the more painful with your confirming that "failed >>> checkpoints killing the job" b'coz essentially it mean that if remote >>> store in unavailable during checkpoint than you have lost state ( till of >>> course you have a retry of none or an unbounded retry delay, a delay that >>> you *hope* the store revives in ) .. Remember the first retry failure >>> will cause new state according the code as written iff the remote store is >>> down. We would rather have a configurable property that establishes our >>> desire to abort something like a "abort_retry_on_chkretrevalfailure" >>> >>> >>> In our case it is very important that we do not undercount a window, one >>> reason we use flink and it's awesome failure guarantees, as various alarms >>> sound ( we do anomaly detection on the time series ). >>> >>> Please create a jira ticket for us to follow or we could do it. >>> >>> >>> PS Not aborting on checkpointing, till a configurable limit is very >>> important too. >>> >>> >>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi Vishal, >>>> >>>> I think you're right! And thanks for looking into this so deeply. >>>> >>>> With your last mail your basically saying, that the checkpoint could >>>> not be restored because your HDFS was temporarily down. If Flink had not >>>> deleted that checkpoint it might have been possible to restore it at a >>>> later point, right? >>>> >>>> Regarding failed checkpoints killing the job: yes, this is currently >>>> the expected behaviour but there are plans to change this. >>>> >>>> Best, >>>> Aljoscha >>>> >>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com> >>>> wrote: >>>> >>>> I think this is the offending piece. There is a catch all Exception, >>>> which IMHO should understand a recoverable exception from an unrecoverable >>>> on. >>>> >>>> >>>> try { >>>> completedCheckpoint = retrieveCompletedCheckpoint(ch >>>> eckpointStateHandle); >>>> if (completedCheckpoint != null) { >>>> completedCheckpoints.add(completedCheckpoint); >>>> } >>>> } catch (Exception e) { >>>> LOG.warn("Could not retrieve checkpoint. Removing it from the >>>> completed " + >>>> "checkpoint store.", e); >>>> // remove the checkpoint with broken state handle >>>> removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle >>>> .f0); >>>> } >>>> >>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> So this is the issue and tell us that it is wrong. ZK had some state ( >>>>> backed by hdfs ) that referred to a checkpoint ( the same exact last >>>>> successful checkpoint that was successful before NN screwed us ). When the >>>>> JM tried to recreate the state and b'coz NN was down failed to retrieve >>>>> the >>>>> CHK handle from hdfs and conveniently ( and I think very wrongly ) removed >>>>> the CHK from being considered and cleaned the pointer ( though failed as >>>>> was NN was down and is obvious from the dangling file in recovery ) . The >>>>> metadata itself was on hdfs and failure in retrieving should have been a >>>>> stop all, not going to trying doing magic exception rather than starting >>>>> from a blank state. >>>>> >>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint >>>>> 44286 from state handle under /0000000000000044286. This indicates that >>>>> the >>>>> retrieved state handle is broken. Try cleaning the state handle store. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> Also note that the zookeeper recovery did ( sadly on the same hdfs >>>>>> cluster ) also showed the same behavior. It had the pointers to the chk >>>>>> point ( I think that is what it does, keeps metadata of where the >>>>>> checkpoint etc ) . It too decided to keep the recovery file from the >>>>>> failed state. >>>>>> >>>>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >>>>>> >>>>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092 >>>>>> >>>>>> This is getting a little interesting. What say you :) >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> Another thing I noted was this thing >>>>>>> >>>>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286 >>>>>>> >>>>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428 >>>>>>> >>>>>>> >>>>>>> Generally what Flink does IMHO is that it replaces the chk point >>>>>>> directory with a new one. I see it happening now. Every minute it >>>>>>> replaces >>>>>>> the old directory. In this job's case however, it did not delete the >>>>>>> 2017-10-04 13:54 and hence the chk-44286 directory. This was the last >>>>>>> chk-44286 ( I think ) successfully created before NN had issues but >>>>>>> as >>>>>>> is usual did not delete this chk-44286. It looks as if it started with >>>>>>> a >>>>>>> blank slate ???????? Does this strike a chord ????? >>>>>>> >>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> Hello Fabian, >>>>>>>> First of all congratulations on this fabulous >>>>>>>> framework. I have worked with GDF and though GDF has some natural >>>>>>>> pluses >>>>>>>> Flink's state management is far more advanced. With kafka as a source >>>>>>>> it >>>>>>>> negates issues GDF has ( GDF integration with pub/sub is organic and >>>>>>>> that >>>>>>>> is to be expected but non FIFO pub/sub is an issue with windows on >>>>>>>> event >>>>>>>> time etc ) >>>>>>>> >>>>>>>> Coming back to this issue. We have that same >>>>>>>> kafka topic feeding a streaming druid datasource and we do not see any >>>>>>>> issue there, so so data loss on the source, kafka is not applicable. I >>>>>>>> am >>>>>>>> totally certain that the "retention" time was not an issue. It is >>>>>>>> 4 days of retention and we fixed this issue within 30 minutes. We could >>>>>>>> replay kafka with a new consumer group.id and that worked fine. >>>>>>>> >>>>>>>> >>>>>>>> Note these properties and see if they strike a chord. >>>>>>>> >>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers >>>>>>>> is the default true. I bring this up to see whether flink will in any >>>>>>>> circumstance drive consumption on the kafka perceived offset rather >>>>>>>> than >>>>>>>> the one in the checkpoint. >>>>>>>> >>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set. The >>>>>>>> state is big enough though therefore IMHO no way the state is stored >>>>>>>> along >>>>>>>> with the meta data in JM ( or ZK ? ) . The reason I bring this up is to >>>>>>>> make sure when you say that the size has to be less than 1024bytes , >>>>>>>> you >>>>>>>> are talking about cumulative state of the pipeine. >>>>>>>> >>>>>>>> * We have a good sense of SP ( save point ) and CP ( checkpoint ) >>>>>>>> and certainly understand that they actually are not dissimilar. >>>>>>>> However in >>>>>>>> this case there were multiple attempts to restart the pipe before it >>>>>>>> finally succeeded. >>>>>>>> >>>>>>>> * Other hdfs related poperties. >>>>>>>> >>>>>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >>>>>>>> flink_hdfs_root %> >>>>>>>> >>>>>>>> state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %> >>>>>>>> >>>>>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= >>>>>>>> flink_hdfs_root %> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Do these make sense ? Is there anything else I should look at. >>>>>>>> Please also note that it is the second time this has happened. The >>>>>>>> first >>>>>>>> time I was vacationing and was not privy to the state of the flink >>>>>>>> pipeline, but the net effect were similar. The counts for the first >>>>>>>> window >>>>>>>> after an internal restart dropped. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thank you for you patience and regards, >>>>>>>> >>>>>>>> Vishal >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Vishal, >>>>>>>>> >>>>>>>>> window operators are always stateful because the operator needs to >>>>>>>>> remember previously received events (WindowFunction) or intermediate >>>>>>>>> results (ReduceFunction). >>>>>>>>> Given the program you described, a checkpoint should include the >>>>>>>>> Kafka consumer offset and the state of the window operator. If the >>>>>>>>> program >>>>>>>>> eventually successfully (i.e., without an error) recovered from the >>>>>>>>> last >>>>>>>>> checkpoint, all its state should have been restored. Since the last >>>>>>>>> checkpoint was before HDFS went into safe mode, the program would >>>>>>>>> have been >>>>>>>>> reset to that point. If the Kafka retention time is less than the >>>>>>>>> time it >>>>>>>>> took to fix HDFS you would have lost data because it would have been >>>>>>>>> removed from Kafka. If that's not the case, we need to investigate >>>>>>>>> this >>>>>>>>> further because a checkpoint recovery must not result in state loss. >>>>>>>>> >>>>>>>>> Restoring from a savepoint is not so much different from automatic >>>>>>>>> checkpoint recovery. Given that you have a completed savepoint, you >>>>>>>>> can >>>>>>>>> restart the job from that point. The main difference is that >>>>>>>>> checkpoints >>>>>>>>> are only used for internal recovery and usually discarded once the >>>>>>>>> job is >>>>>>>>> terminated while savepoints are retained. >>>>>>>>> >>>>>>>>> Regarding your question if a failed checkpoint should cause the >>>>>>>>> job to fail and recover I'm not sure what the current status is. >>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails. >>>>>>>>> >>>>>>>>> Best, Fabian >>>>>>>>> >>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>> >>>>>>>>>> To add to it, my pipeline is a simple >>>>>>>>>> >>>>>>>>>> keyBy(0) >>>>>>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>>>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>>>>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hello folks, >>>>>>>>>>> >>>>>>>>>>> As far as I know checkpoint failure should be ignored and >>>>>>>>>>> retried with potentially larger state. I had this situation >>>>>>>>>>> >>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>>>>>>>> * exception was thrown >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >>>>>>>>>>> Operation category WRITE is not supported in state standby. Visit >>>>>>>>>>> https://s.apache.org/sbnn-error >>>>>>>>>>> .................. >>>>>>>>>>> >>>>>>>>>>> at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs( >>>>>>>>>>> HadoopFileSystem.java:453) >>>>>>>>>>> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem. >>>>>>>>>>> mkdirs(SafetyNetWrapperFileSystem.java:111) >>>>>>>>>>> at org.apache.flink.runtime.state.filesystem. >>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck >>>>>>>>>>> pointStreamFactory.java:132) >>>>>>>>>>> >>>>>>>>>>> * The pipeline came back after a few restarts and checkpoint >>>>>>>>>>> failures, after the hdfs issues were resolved. >>>>>>>>>>> >>>>>>>>>>> I would not have worried about the restart, but it was evident >>>>>>>>>>> that I lost my operator state. Either it was my kafka consumer that >>>>>>>>>>> kept on >>>>>>>>>>> advancing it's offset between a start and the next checkpoint >>>>>>>>>>> failure ( a >>>>>>>>>>> minute's worth ) or the the operator that had partial aggregates >>>>>>>>>>> was lost. >>>>>>>>>>> I have a 15 minute window of counts on a keyed operator >>>>>>>>>>> >>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned on. >>>>>>>>>>> >>>>>>>>>>> The questions thus are >>>>>>>>>>> >>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>>>>>>>> * Why on restart did the operator state did not recreate ? >>>>>>>>>>> * Is the nature of the exception thrown have to do with any of >>>>>>>>>>> this b'coz suspend and resume from a save point work as expected ? >>>>>>>>>>> * And though I am pretty sure, are operators like the Window >>>>>>>>>>> operator stateful by drfault and thus if I have >>>>>>>>>>> timeWindow(Time.of(window_ >>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new >>>>>>>>>>> WindowFunction()), the state is managed by flink ? >>>>>>>>>>> >>>>>>>>>>> Thanks. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >> >