I think this is the offending piece. There is a catch all Exception, which IMHO should understand a recoverable exception from an unrecoverable on.
try { completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); if (completedCheckpoint != null) { completedCheckpoints.add(completedCheckpoint); } } catch (Exception e) { LOG.warn("Could not retrieve checkpoint. Removing it from the completed " + "checkpoint store.", e); // remove the checkpoint with broken state handle removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0); } On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > So this is the issue and tell us that it is wrong. ZK had some state ( > backed by hdfs ) that referred to a checkpoint ( the same exact last > successful checkpoint that was successful before NN screwed us ). When the > JM tried to recreate the state and b'coz NN was down failed to retrieve the > CHK handle from hdfs and conveniently ( and I think very wrongly ) removed > the CHK from being considered and cleaned the pointer ( though failed as > was NN was down and is obvious from the dangling file in recovery ) . The > metadata itself was on hdfs and failure in retrieving should have been a > stop all, not going to trying doing magic exception rather than starting > from a blank state. > > org.apache.flink.util.FlinkException: Could not retrieve checkpoint 44286 > from state handle under /0000000000000044286. This indicates that the > retrieved state handle is broken. Try cleaning the state handle store. > > > > > > > On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> Also note that the zookeeper recovery did ( sadly on the same hdfs >> cluster ) also showed the same behavior. It had the pointers to the chk >> point ( I think that is what it does, keeps metadata of where the >> checkpoint etc ) . It too decided to keep the recovery file from the >> failed state. >> >> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >> >> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >> /flink-recovery/prod/completedCheckpoint7c5a19300092 >> >> This is getting a little interesting. What say you :) >> >> >> >> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Another thing I noted was this thing >>> >>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286 >>> >>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428 >>> >>> >>> Generally what Flink does IMHO is that it replaces the chk point >>> directory with a new one. I see it happening now. Every minute it replaces >>> the old directory. In this job's case however, it did not delete the >>> 2017-10-04 13:54 and hence the chk-44286 directory. This was the last >>> chk-44286 ( I think ) successfully created before NN had issues but as >>> is usual did not delete this chk-44286. It looks as if it started with a >>> blank slate ???????? Does this strike a chord ????? >>> >>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> Hello Fabian, >>>> First of all congratulations on this fabulous >>>> framework. I have worked with GDF and though GDF has some natural pluses >>>> Flink's state management is far more advanced. With kafka as a source it >>>> negates issues GDF has ( GDF integration with pub/sub is organic and that >>>> is to be expected but non FIFO pub/sub is an issue with windows on event >>>> time etc ) >>>> >>>> Coming back to this issue. We have that same kafka >>>> topic feeding a streaming druid datasource and we do not see any issue >>>> there, so so data loss on the source, kafka is not applicable. I am totally >>>> certain that the "retention" time was not an issue. It is 4 days of >>>> retention and we fixed this issue within 30 minutes. We could replay kafka >>>> with a new consumer group.id and that worked fine. >>>> >>>> >>>> Note these properties and see if they strike a chord. >>>> >>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is >>>> the default true. I bring this up to see whether flink will in any >>>> circumstance drive consumption on the kafka perceived offset rather than >>>> the one in the checkpoint. >>>> >>>> * The state.backend.fs.memory-threshold: 0 has not been set. The >>>> state is big enough though therefore IMHO no way the state is stored along >>>> with the meta data in JM ( or ZK ? ) . The reason I bring this up is to >>>> make sure when you say that the size has to be less than 1024bytes , you >>>> are talking about cumulative state of the pipeine. >>>> >>>> * We have a good sense of SP ( save point ) and CP ( checkpoint ) and >>>> certainly understand that they actually are not dissimilar. However in this >>>> case there were multiple attempts to restart the pipe before it finally >>>> succeeded. >>>> >>>> * Other hdfs related poperties. >>>> >>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >>>> flink_hdfs_root %> >>>> >>>> state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %> >>>> >>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= flink_hdfs_root >>>> %> >>>> >>>> >>>> >>>> Do these make sense ? Is there anything else I should look at. Please >>>> also note that it is the second time this has happened. The first time I >>>> was vacationing and was not privy to the state of the flink pipeline, but >>>> the net effect were similar. The counts for the first window after an >>>> internal restart dropped. >>>> >>>> >>>> >>>> >>>> Thank you for you patience and regards, >>>> >>>> Vishal >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Vishal, >>>>> >>>>> window operators are always stateful because the operator needs to >>>>> remember previously received events (WindowFunction) or intermediate >>>>> results (ReduceFunction). >>>>> Given the program you described, a checkpoint should include the Kafka >>>>> consumer offset and the state of the window operator. If the program >>>>> eventually successfully (i.e., without an error) recovered from the last >>>>> checkpoint, all its state should have been restored. Since the last >>>>> checkpoint was before HDFS went into safe mode, the program would have >>>>> been >>>>> reset to that point. If the Kafka retention time is less than the time it >>>>> took to fix HDFS you would have lost data because it would have been >>>>> removed from Kafka. If that's not the case, we need to investigate this >>>>> further because a checkpoint recovery must not result in state loss. >>>>> >>>>> Restoring from a savepoint is not so much different from automatic >>>>> checkpoint recovery. Given that you have a completed savepoint, you can >>>>> restart the job from that point. The main difference is that checkpoints >>>>> are only used for internal recovery and usually discarded once the job is >>>>> terminated while savepoints are retained. >>>>> >>>>> Regarding your question if a failed checkpoint should cause the job to >>>>> fail and recover I'm not sure what the current status is. >>>>> Stefan (in CC) should know what happens if a checkpoint fails. >>>>> >>>>> Best, Fabian >>>>> >>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>>>> >>>>>> To add to it, my pipeline is a simple >>>>>> >>>>>> keyBy(0) >>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>>> >>>>>> >>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> Hello folks, >>>>>>> >>>>>>> As far as I know checkpoint failure should be ignored and retried >>>>>>> with potentially larger state. I had this situation >>>>>>> >>>>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>>>> * exception was thrown >>>>>>> >>>>>>> >>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >>>>>>> Operation category WRITE is not supported in state standby. Visit >>>>>>> https://s.apache.org/sbnn-error >>>>>>> .................. >>>>>>> >>>>>>> at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(Had >>>>>>> oopFileSystem.java:453) >>>>>>> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem. >>>>>>> mkdirs(SafetyNetWrapperFileSystem.java:111) >>>>>>> at org.apache.flink.runtime.state.filesystem. >>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory. >>>>>>> java:132) >>>>>>> >>>>>>> * The pipeline came back after a few restarts and checkpoint >>>>>>> failures, after the hdfs issues were resolved. >>>>>>> >>>>>>> I would not have worried about the restart, but it was evident that >>>>>>> I lost my operator state. Either it was my kafka consumer that kept on >>>>>>> advancing it's offset between a start and the next checkpoint failure ( >>>>>>> a >>>>>>> minute's worth ) or the the operator that had partial aggregates was >>>>>>> lost. >>>>>>> I have a 15 minute window of counts on a keyed operator >>>>>>> >>>>>>> I am using ROCKS DB and of course have checkpointing turned on. >>>>>>> >>>>>>> The questions thus are >>>>>>> >>>>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>>>> * Why on restart did the operator state did not recreate ? >>>>>>> * Is the nature of the exception thrown have to do with any of this >>>>>>> b'coz suspend and resume from a save point work as expected ? >>>>>>> * And though I am pretty sure, are operators like the Window >>>>>>> operator stateful by drfault and thus if I have >>>>>>> timeWindow(Time.of(window_ >>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new >>>>>>> WindowFunction()), the state is managed by flink ? >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >