Folks sorry for being late on this. Can some body with the knowledge of this code base create a jira issue for the above ? We have seen this more than once on production.
On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Vishal, > > Some relevant Jira issues for you are: > > - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping > failed checkpoints > - https://issues.apache.org/jira/browse/FLINK-4815: Automatic fallback > to earlier checkpoint when checkpoint restore fails > - https://issues.apache.org/jira/browse/FLINK-7783: Don't always remove > checkpoints in ZooKeeperCompletedCheckpointStore#recover() > > Best, > Aljoscha > > > On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Vishal, > > it would be great if you could create a JIRA ticket with Blocker priority. > Please add all relevant information of your detailed analysis, add a link > to this email thread (see [1] for the web archive of the mailing list), and > post the id of the JIRA issue here. > > Thanks for looking into this! > > Best regards, > Fabian > > [1] https://lists.apache.org/list.html?user@flink.apache.org > > 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>: > >> Thank you for confirming. >> >> >> I think this is a critical bug. In essence any checkpoint store ( >> hdfs/S3/File) will loose state if it is unavailable at resume. This >> becomes all the more painful with your confirming that "failed >> checkpoints killing the job" b'coz essentially it mean that if remote >> store in unavailable during checkpoint than you have lost state ( till of >> course you have a retry of none or an unbounded retry delay, a delay that >> you *hope* the store revives in ) .. Remember the first retry failure >> will cause new state according the code as written iff the remote store is >> down. We would rather have a configurable property that establishes our >> desire to abort something like a "abort_retry_on_chkretrevalfailure" >> >> >> In our case it is very important that we do not undercount a window, one >> reason we use flink and it's awesome failure guarantees, as various alarms >> sound ( we do anomaly detection on the time series ). >> >> Please create a jira ticket for us to follow or we could do it. >> >> >> PS Not aborting on checkpointing, till a configurable limit is very >> important too. >> >> >> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi Vishal, >>> >>> I think you're right! And thanks for looking into this so deeply. >>> >>> With your last mail your basically saying, that the checkpoint could not >>> be restored because your HDFS was temporarily down. If Flink had not >>> deleted that checkpoint it might have been possible to restore it at a >>> later point, right? >>> >>> Regarding failed checkpoints killing the job: yes, this is currently the >>> expected behaviour but there are plans to change this. >>> >>> Best, >>> Aljoscha >>> >>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com> >>> wrote: >>> >>> I think this is the offending piece. There is a catch all Exception, >>> which IMHO should understand a recoverable exception from an unrecoverable >>> on. >>> >>> >>> try { >>> completedCheckpoint = retrieveCompletedCheckpoint(ch >>> eckpointStateHandle); >>> if (completedCheckpoint != null) { >>> completedCheckpoints.add(completedCheckpoint); >>> } >>> } catch (Exception e) { >>> LOG.warn("Could not retrieve checkpoint. Removing it from the completed >>> " + >>> "checkpoint store.", e); >>> // remove the checkpoint with broken state handle >>> removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle. >>> f0); >>> } >>> >>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> So this is the issue and tell us that it is wrong. ZK had some state ( >>>> backed by hdfs ) that referred to a checkpoint ( the same exact last >>>> successful checkpoint that was successful before NN screwed us ). When the >>>> JM tried to recreate the state and b'coz NN was down failed to retrieve the >>>> CHK handle from hdfs and conveniently ( and I think very wrongly ) removed >>>> the CHK from being considered and cleaned the pointer ( though failed as >>>> was NN was down and is obvious from the dangling file in recovery ) . The >>>> metadata itself was on hdfs and failure in retrieving should have been a >>>> stop all, not going to trying doing magic exception rather than starting >>>> from a blank state. >>>> >>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint >>>> 44286 from state handle under /0000000000000044286. This indicates that the >>>> retrieved state handle is broken. Try cleaning the state handle store. >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> Also note that the zookeeper recovery did ( sadly on the same hdfs >>>>> cluster ) also showed the same behavior. It had the pointers to the chk >>>>> point ( I think that is what it does, keeps metadata of where the >>>>> checkpoint etc ) . It too decided to keep the recovery file from the >>>>> failed state. >>>>> >>>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >>>>> >>>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092 >>>>> >>>>> This is getting a little interesting. What say you :) >>>>> >>>>> >>>>> >>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> Another thing I noted was this thing >>>>>> >>>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286 >>>>>> >>>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428 >>>>>> >>>>>> >>>>>> Generally what Flink does IMHO is that it replaces the chk point >>>>>> directory with a new one. I see it happening now. Every minute it >>>>>> replaces >>>>>> the old directory. In this job's case however, it did not delete the >>>>>> 2017-10-04 13:54 and hence the chk-44286 directory. This was the last >>>>>> chk-44286 ( I think ) successfully created before NN had issues but as >>>>>> is usual did not delete this chk-44286. It looks as if it started with a >>>>>> blank slate ???????? Does this strike a chord ????? >>>>>> >>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> Hello Fabian, >>>>>>> First of all congratulations on this fabulous >>>>>>> framework. I have worked with GDF and though GDF has some natural pluses >>>>>>> Flink's state management is far more advanced. With kafka as a source it >>>>>>> negates issues GDF has ( GDF integration with pub/sub is organic and >>>>>>> that >>>>>>> is to be expected but non FIFO pub/sub is an issue with windows on event >>>>>>> time etc ) >>>>>>> >>>>>>> Coming back to this issue. We have that same >>>>>>> kafka topic feeding a streaming druid datasource and we do not see any >>>>>>> issue there, so so data loss on the source, kafka is not applicable. I >>>>>>> am >>>>>>> totally certain that the "retention" time was not an issue. It is 4 >>>>>>> days of retention and we fixed this issue within 30 minutes. We could >>>>>>> replay kafka with a new consumer group.id and that worked fine. >>>>>>> >>>>>>> >>>>>>> Note these properties and see if they strike a chord. >>>>>>> >>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is >>>>>>> the default true. I bring this up to see whether flink will in any >>>>>>> circumstance drive consumption on the kafka perceived offset rather than >>>>>>> the one in the checkpoint. >>>>>>> >>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set. The >>>>>>> state is big enough though therefore IMHO no way the state is stored >>>>>>> along >>>>>>> with the meta data in JM ( or ZK ? ) . The reason I bring this up is to >>>>>>> make sure when you say that the size has to be less than 1024bytes , you >>>>>>> are talking about cumulative state of the pipeine. >>>>>>> >>>>>>> * We have a good sense of SP ( save point ) and CP ( checkpoint ) >>>>>>> and certainly understand that they actually are not dissimilar. However >>>>>>> in >>>>>>> this case there were multiple attempts to restart the pipe before it >>>>>>> finally succeeded. >>>>>>> >>>>>>> * Other hdfs related poperties. >>>>>>> >>>>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >>>>>>> flink_hdfs_root %> >>>>>>> >>>>>>> state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %> >>>>>>> >>>>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= >>>>>>> flink_hdfs_root %> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Do these make sense ? Is there anything else I should look at. >>>>>>> Please also note that it is the second time this has happened. The first >>>>>>> time I was vacationing and was not privy to the state of the flink >>>>>>> pipeline, but the net effect were similar. The counts for the first >>>>>>> window >>>>>>> after an internal restart dropped. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thank you for you patience and regards, >>>>>>> >>>>>>> Vishal >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Vishal, >>>>>>>> >>>>>>>> window operators are always stateful because the operator needs to >>>>>>>> remember previously received events (WindowFunction) or intermediate >>>>>>>> results (ReduceFunction). >>>>>>>> Given the program you described, a checkpoint should include the >>>>>>>> Kafka consumer offset and the state of the window operator. If the >>>>>>>> program >>>>>>>> eventually successfully (i.e., without an error) recovered from the >>>>>>>> last >>>>>>>> checkpoint, all its state should have been restored. Since the last >>>>>>>> checkpoint was before HDFS went into safe mode, the program would have >>>>>>>> been >>>>>>>> reset to that point. If the Kafka retention time is less than the time >>>>>>>> it >>>>>>>> took to fix HDFS you would have lost data because it would have been >>>>>>>> removed from Kafka. If that's not the case, we need to investigate this >>>>>>>> further because a checkpoint recovery must not result in state loss. >>>>>>>> >>>>>>>> Restoring from a savepoint is not so much different from automatic >>>>>>>> checkpoint recovery. Given that you have a completed savepoint, you can >>>>>>>> restart the job from that point. The main difference is that >>>>>>>> checkpoints >>>>>>>> are only used for internal recovery and usually discarded once the job >>>>>>>> is >>>>>>>> terminated while savepoints are retained. >>>>>>>> >>>>>>>> Regarding your question if a failed checkpoint should cause the job >>>>>>>> to fail and recover I'm not sure what the current status is. >>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails. >>>>>>>> >>>>>>>> Best, Fabian >>>>>>>> >>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com>: >>>>>>>> >>>>>>>>> To add to it, my pipeline is a simple >>>>>>>>> >>>>>>>>> keyBy(0) >>>>>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>>>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hello folks, >>>>>>>>>> >>>>>>>>>> As far as I know checkpoint failure should be ignored and retried >>>>>>>>>> with potentially larger state. I had this situation >>>>>>>>>> >>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>>>>>>> * exception was thrown >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >>>>>>>>>> Operation category WRITE is not supported in state standby. Visit >>>>>>>>>> https://s.apache.org/sbnn-error >>>>>>>>>> .................. >>>>>>>>>> >>>>>>>>>> at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs( >>>>>>>>>> HadoopFileSystem.java:453) >>>>>>>>>> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem. >>>>>>>>>> mkdirs(SafetyNetWrapperFileSystem.java:111) >>>>>>>>>> at org.apache.flink.runtime.state.filesystem. >>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck >>>>>>>>>> pointStreamFactory.java:132) >>>>>>>>>> >>>>>>>>>> * The pipeline came back after a few restarts and checkpoint >>>>>>>>>> failures, after the hdfs issues were resolved. >>>>>>>>>> >>>>>>>>>> I would not have worried about the restart, but it was evident >>>>>>>>>> that I lost my operator state. Either it was my kafka consumer that >>>>>>>>>> kept on >>>>>>>>>> advancing it's offset between a start and the next checkpoint >>>>>>>>>> failure ( a >>>>>>>>>> minute's worth ) or the the operator that had partial aggregates was >>>>>>>>>> lost. >>>>>>>>>> I have a 15 minute window of counts on a keyed operator >>>>>>>>>> >>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned on. >>>>>>>>>> >>>>>>>>>> The questions thus are >>>>>>>>>> >>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>>>>>>> * Why on restart did the operator state did not recreate ? >>>>>>>>>> * Is the nature of the exception thrown have to do with any of >>>>>>>>>> this b'coz suspend and resume from a save point work as expected ? >>>>>>>>>> * And though I am pretty sure, are operators like the Window >>>>>>>>>> operator stateful by drfault and thus if I have >>>>>>>>>> timeWindow(Time.of(window_ >>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new >>>>>>>>>> WindowFunction()), the state is managed by flink ? >>>>>>>>>> >>>>>>>>>> Thanks. >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> > >