Also note that the zookeeper recovery did ( sadly on the same hdfs cluster ) also showed the same behavior. It had the pointers to the chk point ( I think that is what it does, keeps metadata of where the checkpoint etc ) . It too decided to keep the recovery file from the failed state.
-rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 /flink-recovery/prod/completedCheckpoint7c5a19300092 This is getting a little interesting. What say you :) On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Another thing I noted was this thing > > drwxr-xr-x - root hadoop 0 2017-10-04 13:54 > /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286 > > drwxr-xr-x - root hadoop 0 2017-10-05 09:15 > /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428 > > > Generally what Flink does IMHO is that it replaces the chk point directory > with a new one. I see it happening now. Every minute it replaces the old > directory. In this job's case however, it did not delete the 2017-10-04 > 13:54 and hence the chk-44286 directory. This was the last chk-44286 ( I > think ) successfully created before NN had issues but as is usual did not > delete this chk-44286. It looks as if it started with a blank slate > ???????? Does this strike a chord ????? > > On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <vishal.santo...@gmail.com > > wrote: > >> Hello Fabian, >> First of all congratulations on this fabulous >> framework. I have worked with GDF and though GDF has some natural pluses >> Flink's state management is far more advanced. With kafka as a source it >> negates issues GDF has ( GDF integration with pub/sub is organic and that >> is to be expected but non FIFO pub/sub is an issue with windows on event >> time etc ) >> >> Coming back to this issue. We have that same kafka >> topic feeding a streaming druid datasource and we do not see any issue >> there, so so data loss on the source, kafka is not applicable. I am totally >> certain that the "retention" time was not an issue. It is 4 days of >> retention and we fixed this issue within 30 minutes. We could replay kafka >> with a new consumer group.id and that worked fine. >> >> >> Note these properties and see if they strike a chord. >> >> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is the >> default true. I bring this up to see whether flink will in any circumstance >> drive consumption on the kafka perceived offset rather than the one in the >> checkpoint. >> >> * The state.backend.fs.memory-threshold: 0 has not been set. The state >> is big enough though therefore IMHO no way the state is stored along with >> the meta data in JM ( or ZK ? ) . The reason I bring this up is to make >> sure when you say that the size has to be less than 1024bytes , you are >> talking about cumulative state of the pipeine. >> >> * We have a good sense of SP ( save point ) and CP ( checkpoint ) and >> certainly understand that they actually are not dissimilar. However in this >> case there were multiple attempts to restart the pipe before it finally >> succeeded. >> >> * Other hdfs related poperties. >> >> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >> flink_hdfs_root %> >> >> state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %> >> >> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= flink_hdfs_root %> >> >> >> >> Do these make sense ? Is there anything else I should look at. Please >> also note that it is the second time this has happened. The first time I >> was vacationing and was not privy to the state of the flink pipeline, but >> the net effect were similar. The counts for the first window after an >> internal restart dropped. >> >> >> >> >> Thank you for you patience and regards, >> >> Vishal >> >> >> >> >> >> >> >> >> >> >> >> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Vishal, >>> >>> window operators are always stateful because the operator needs to >>> remember previously received events (WindowFunction) or intermediate >>> results (ReduceFunction). >>> Given the program you described, a checkpoint should include the Kafka >>> consumer offset and the state of the window operator. If the program >>> eventually successfully (i.e., without an error) recovered from the last >>> checkpoint, all its state should have been restored. Since the last >>> checkpoint was before HDFS went into safe mode, the program would have been >>> reset to that point. If the Kafka retention time is less than the time it >>> took to fix HDFS you would have lost data because it would have been >>> removed from Kafka. If that's not the case, we need to investigate this >>> further because a checkpoint recovery must not result in state loss. >>> >>> Restoring from a savepoint is not so much different from automatic >>> checkpoint recovery. Given that you have a completed savepoint, you can >>> restart the job from that point. The main difference is that checkpoints >>> are only used for internal recovery and usually discarded once the job is >>> terminated while savepoints are retained. >>> >>> Regarding your question if a failed checkpoint should cause the job to >>> fail and recover I'm not sure what the current status is. >>> Stefan (in CC) should know what happens if a checkpoint fails. >>> >>> Best, Fabian >>> >>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>> >>>> To add to it, my pipeline is a simple >>>> >>>> keyBy(0) >>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>> >>>> >>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> Hello folks, >>>>> >>>>> As far as I know checkpoint failure should be ignored and retried with >>>>> potentially larger state. I had this situation >>>>> >>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>> * exception was thrown >>>>> >>>>> >>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >>>>> Operation category WRITE is not supported in state standby. Visit >>>>> https://s.apache.org/sbnn-error >>>>> .................. >>>>> >>>>> at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(Had >>>>> oopFileSystem.java:453) >>>>> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs( >>>>> SafetyNetWrapperFileSystem.java:111) >>>>> at org.apache.flink.runtime.state.filesystem. >>>>> FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory. >>>>> java:132) >>>>> >>>>> * The pipeline came back after a few restarts and checkpoint failures, >>>>> after the hdfs issues were resolved. >>>>> >>>>> I would not have worried about the restart, but it was evident that I >>>>> lost my operator state. Either it was my kafka consumer that kept on >>>>> advancing it's offset between a start and the next checkpoint failure ( a >>>>> minute's worth ) or the the operator that had partial aggregates was lost. >>>>> I have a 15 minute window of counts on a keyed operator >>>>> >>>>> I am using ROCKS DB and of course have checkpointing turned on. >>>>> >>>>> The questions thus are >>>>> >>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>> * Why on restart did the operator state did not recreate ? >>>>> * Is the nature of the exception thrown have to do with any of this >>>>> b'coz suspend and resume from a save point work as expected ? >>>>> * And though I am pretty sure, are operators like the Window operator >>>>> stateful by drfault and thus if I have timeWindow(Time.of(window_size, >>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), the >>>>> state is managed by flink ? >>>>> >>>>> Thanks. >>>>> >>>> >>>> >>> >> >