Another thing I noted was this thing drwxr-xr-x - root hadoop 0 2017-10-04 13:54 /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
drwxr-xr-x - root hadoop 0 2017-10-05 09:15 /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428 Generally what Flink does IMHO is that it replaces the chk point directory with a new one. I see it happening now. Every minute it replaces the old directory. In this job's case however, it did not delete the 2017-10-04 13:54 and hence the chk-44286 directory. This was the last chk-44286 ( I think ) successfully created before NN had issues but as is usual did not delete this chk-44286. It looks as if it started with a blank slate ???????? Does this strike a chord ????? On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Hello Fabian, > First of all congratulations on this fabulous > framework. I have worked with GDF and though GDF has some natural pluses > Flink's state management is far more advanced. With kafka as a source it > negates issues GDF has ( GDF integration with pub/sub is organic and that > is to be expected but non FIFO pub/sub is an issue with windows on event > time etc ) > > Coming back to this issue. We have that same kafka > topic feeding a streaming druid datasource and we do not see any issue > there, so so data loss on the source, kafka is not applicable. I am totally > certain that the "retention" time was not an issue. It is 4 days of > retention and we fixed this issue within 30 minutes. We could replay kafka > with a new consumer group.id and that worked fine. > > > Note these properties and see if they strike a chord. > > * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is the > default true. I bring this up to see whether flink will in any circumstance > drive consumption on the kafka perceived offset rather than the one in the > checkpoint. > > * The state.backend.fs.memory-threshold: 0 has not been set. The state > is big enough though therefore IMHO no way the state is stored along with > the meta data in JM ( or ZK ? ) . The reason I bring this up is to make > sure when you say that the size has to be less than 1024bytes , you are > talking about cumulative state of the pipeine. > > * We have a good sense of SP ( save point ) and CP ( checkpoint ) and > certainly understand that they actually are not dissimilar. However in this > case there were multiple attempts to restart the pipe before it finally > succeeded. > > * Other hdfs related poperties. > > state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= > flink_hdfs_root %> > > state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %> > > recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= flink_hdfs_root %> > > > > Do these make sense ? Is there anything else I should look at. Please > also note that it is the second time this has happened. The first time I > was vacationing and was not privy to the state of the flink pipeline, but > the net effect were similar. The counts for the first window after an > internal restart dropped. > > > > > Thank you for you patience and regards, > > Vishal > > > > > > > > > > > > On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Vishal, >> >> window operators are always stateful because the operator needs to >> remember previously received events (WindowFunction) or intermediate >> results (ReduceFunction). >> Given the program you described, a checkpoint should include the Kafka >> consumer offset and the state of the window operator. If the program >> eventually successfully (i.e., without an error) recovered from the last >> checkpoint, all its state should have been restored. Since the last >> checkpoint was before HDFS went into safe mode, the program would have been >> reset to that point. If the Kafka retention time is less than the time it >> took to fix HDFS you would have lost data because it would have been >> removed from Kafka. If that's not the case, we need to investigate this >> further because a checkpoint recovery must not result in state loss. >> >> Restoring from a savepoint is not so much different from automatic >> checkpoint recovery. Given that you have a completed savepoint, you can >> restart the job from that point. The main difference is that checkpoints >> are only used for internal recovery and usually discarded once the job is >> terminated while savepoints are retained. >> >> Regarding your question if a failed checkpoint should cause the job to >> fail and recover I'm not sure what the current status is. >> Stefan (in CC) should know what happens if a checkpoint fails. >> >> Best, Fabian >> >> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>: >> >>> To add to it, my pipeline is a simple >>> >>> keyBy(0) >>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>> .reduce(new ReduceFunction(), new WindowFunction()) >>> >>> >>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> Hello folks, >>>> >>>> As far as I know checkpoint failure should be ignored and retried with >>>> potentially larger state. I had this situation >>>> >>>> * hdfs went into a safe mode b'coz of Name Node issues >>>> * exception was thrown >>>> >>>> >>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >>>> Operation category WRITE is not supported in state standby. Visit >>>> https://s.apache.org/sbnn-error >>>> .................. >>>> >>>> at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(Had >>>> oopFileSystem.java:453) >>>> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(S >>>> afetyNetWrapperFileSystem.java:111) >>>> at org.apache.flink.runtime.state.filesystem.FsCheckpointStream >>>> Factory.createBasePath(FsCheckpointStreamFactory.java:132) >>>> >>>> * The pipeline came back after a few restarts and checkpoint failures, >>>> after the hdfs issues were resolved. >>>> >>>> I would not have worried about the restart, but it was evident that I >>>> lost my operator state. Either it was my kafka consumer that kept on >>>> advancing it's offset between a start and the next checkpoint failure ( a >>>> minute's worth ) or the the operator that had partial aggregates was lost. >>>> I have a 15 minute window of counts on a keyed operator >>>> >>>> I am using ROCKS DB and of course have checkpointing turned on. >>>> >>>> The questions thus are >>>> >>>> * Should a pipeline be restarted if checkpoint fails ? >>>> * Why on restart did the operator state did not recreate ? >>>> * Is the nature of the exception thrown have to do with any of this >>>> b'coz suspend and resume from a save point work as expected ? >>>> * And though I am pretty sure, are operators like the Window operator >>>> stateful by drfault and thus if I have timeWindow(Time.of(window_size, >>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), the >>>> state is managed by flink ? >>>> >>>> Thanks. >>>> >>> >>> >> >