Hi Vishal, Some relevant Jira issues for you are:
- https://issues.apache.org/jira/browse/FLINK-4808: <https://issues.apache.org/jira/browse/FLINK-4808:> Allow skipping failed checkpoints - https://issues.apache.org/jira/browse/FLINK-4815: <https://issues.apache.org/jira/browse/FLINK-4815:> Automatic fallback to earlier checkpoint when checkpoint restore fails - https://issues.apache.org/jira/browse/FLINK-7783: <https://issues.apache.org/jira/browse/FLINK-7783:> Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover() Best, Aljoscha > On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Vishal, > > it would be great if you could create a JIRA ticket with Blocker priority. > Please add all relevant information of your detailed analysis, add a link to > this email thread (see [1] for the web archive of the mailing list), and post > the id of the JIRA issue here. > > Thanks for looking into this! > > Best regards, > Fabian > > [1] https://lists.apache.org/list.html?user@flink.apache.org > <https://lists.apache.org/list.html?user@flink.apache.org> > > 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com > <mailto:vishal.santo...@gmail.com>>: > Thank you for confirming. > > > I think this is a critical bug. In essence any checkpoint store ( > hdfs/S3/File) will loose state if it is unavailable at resume. This becomes > all the more painful with your confirming that "failed checkpoints killing > the job" b'coz essentially it mean that if remote store in unavailable > during checkpoint than you have lost state ( till of course you have a retry > of none or an unbounded retry delay, a delay that you hope the store revives > in ) .. Remember the first retry failure will cause new state according the > code as written iff the remote store is down. We would rather have a > configurable property that establishes our desire to abort something like a > "abort_retry_on_chkretrevalfailure" > > > In our case it is very important that we do not undercount a window, one > reason we use flink and it's awesome failure guarantees, as various alarms > sound ( we do anomaly detection on the time series ). > > Please create a jira ticket for us to follow or we could do it. > > > PS Not aborting on checkpointing, till a configurable limit is very important > too. > > > On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Hi Vishal, > > I think you're right! And thanks for looking into this so deeply. > > With your last mail your basically saying, that the checkpoint could not be > restored because your HDFS was temporarily down. If Flink had not deleted > that checkpoint it might have been possible to restore it at a later point, > right? > > Regarding failed checkpoints killing the job: yes, this is currently the > expected behaviour but there are plans to change this. > > Best, > Aljoscha > >> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> >> I think this is the offending piece. There is a catch all Exception, which >> IMHO should understand a recoverable exception from an unrecoverable on. >> >> >> try { >> completedCheckpoint = >> retrieveCompletedCheckpoint(checkpointStateHandle); >> if (completedCheckpoint != null) { >> >> completedCheckpoints.add(completedCheckpoint); >> } >> } catch (Exception e) { >> LOG.warn("Could not retrieve checkpoint. >> Removing it from the completed " + >> "checkpoint store.", e); >> >> // remove the checkpoint with broken state >> handle >> >> removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0); >> } >> >> >> >> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> So this is the issue and tell us that it is wrong. ZK had some state ( >> backed by hdfs ) that referred to a checkpoint ( the same exact last >> successful checkpoint that was successful before NN screwed us ). When the >> JM tried to recreate the state and b'coz NN was down failed to retrieve the >> CHK handle from hdfs and conveniently ( and I think very wrongly ) removed >> the CHK from being considered and cleaned the pointer ( though failed as was >> NN was down and is obvious from the dangling file in recovery ) . The >> metadata itself was on hdfs and failure in retrieving should have been a >> stop all, not going to trying doing magic exception rather than starting >> from a blank state. >> >> org.apache.flink.util.FlinkException: Could not retrieve checkpoint 44286 >> from state handle under /0000000000000044286. This indicates that the >> retrieved state handle is broken. Try cleaning the state handle store. >> >> >> >> >> >> >> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> Also note that the zookeeper recovery did ( sadly on the same hdfs cluster >> ) also showed the same behavior. It had the pointers to the chk point ( I >> think that is what it does, keeps metadata of where the checkpoint etc ) . >> It too decided to keep the recovery file from the failed state. >> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >> >> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >> /flink-recovery/prod/completedCheckpoint7c5a19300092 >> >> This is getting a little interesting. What say you :) >> >> >> >> >> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> Another thing I noted was this thing >> >> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286 >> >> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428 >> >> >> >> Generally what Flink does IMHO is that it replaces the chk point directory >> with a new one. I see it happening now. Every minute it replaces the old >> directory. In this job's case however, it did not delete the 2017-10-04 >> 13:54 and hence the chk-44286 directory. This was the last chk-44286 ( I >> think ) successfully created before NN had issues but as is usual did not >> delete this chk-44286. It looks as if it started with a blank slate >> ???????? Does this strike a chord ????? >> >> >> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> Hello Fabian, >> First of all congratulations on this fabulous >> framework. I have worked with GDF and though GDF has some natural pluses >> Flink's state management is far more advanced. With kafka as a source it >> negates issues GDF has ( GDF integration with pub/sub is organic and that is >> to be expected but non FIFO pub/sub is an issue with windows on event time >> etc ) >> >> Coming back to this issue. We have that same kafka topic >> feeding a streaming druid datasource and we do not see any issue there, so >> so data loss on the source, kafka is not applicable. I am totally certain >> that the "retention" time was not an issue. It is 4 days of retention and we >> fixed this issue within 30 minutes. We could replay kafka with a new >> consumer group.id <http://group.id/> and that worked fine. >> >> >> Note these properties and see if they strike a chord. >> >> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is the >> default true. I bring this up to see whether flink will in any circumstance >> drive consumption on the kafka perceived offset rather than the one in the >> checkpoint. >> >> * The state.backend.fs.memory-threshold: 0 has not been set. The state is >> big enough though therefore IMHO no way the state is stored along with the >> meta data in JM ( or ZK ? ) . The reason I bring this up is to make sure >> when you say that the size has to be less than 1024bytes , you are talking >> about cumulative state of the pipeine. >> >> * We have a good sense of SP ( save point ) and CP ( checkpoint ) and >> certainly understand that they actually are not dissimilar. However in this >> case there were multiple attempts to restart the pipe before it finally >> succeeded. >> >> * Other hdfs related poperties. >> >> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints <>/<%= >> flink_hdfs_root %> >> state.savepoints.dir: hdfs:///flink-savepoints <>/<%= flink_hdfs_root %> >> recovery.zookeeper.storageDir: hdfs:///flink-recovery <>/<%= >> flink_hdfs_root %> >> >> >> Do these make sense ? Is there anything else I should look at. Please also >> note that it is the second time this has happened. The first time I was >> vacationing and was not privy to the state of the flink pipeline, but the >> net effect were similar. The counts for the first window after an internal >> restart dropped. >> >> >> >> >> Thank you for you patience and regards, >> >> Vishal >> >> >> >> >> >> >> >> >> >> >> >> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com >> <mailto:fhue...@gmail.com>> wrote: >> Hi Vishal, >> >> window operators are always stateful because the operator needs to remember >> previously received events (WindowFunction) or intermediate results >> (ReduceFunction). >> Given the program you described, a checkpoint should include the Kafka >> consumer offset and the state of the window operator. If the program >> eventually successfully (i.e., without an error) recovered from the last >> checkpoint, all its state should have been restored. Since the last >> checkpoint was before HDFS went into safe mode, the program would have been >> reset to that point. If the Kafka retention time is less than the time it >> took to fix HDFS you would have lost data because it would have been removed >> from Kafka. If that's not the case, we need to investigate this further >> because a checkpoint recovery must not result in state loss. >> >> Restoring from a savepoint is not so much different from automatic >> checkpoint recovery. Given that you have a completed savepoint, you can >> restart the job from that point. The main difference is that checkpoints are >> only used for internal recovery and usually discarded once the job is >> terminated while savepoints are retained. >> >> Regarding your question if a failed checkpoint should cause the job to fail >> and recover I'm not sure what the current status is. >> Stefan (in CC) should know what happens if a checkpoint fails. >> >> Best, Fabian >> >> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>>: >> To add to it, my pipeline is a simple >> >> keyBy(0) >> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >> .reduce(new ReduceFunction(), new WindowFunction()) >> >> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <vishal.santo...@gmail.com >> <mailto:vishal.santo...@gmail.com>> wrote: >> Hello folks, >> >> As far as I know checkpoint failure should be ignored and retried with >> potentially larger state. I had this situation >> >> * hdfs went into a safe mode b'coz of Name Node issues >> * exception was thrown >> >> >> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >> Operation category WRITE is not supported in state standby. Visit >> https://s.apache.org/sbnn-error <https://s.apache.org/sbnn-error> >> .................. >> >> at >> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) >> at >> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) >> at >> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) >> >> * The pipeline came back after a few restarts and checkpoint failures, after >> the hdfs issues were resolved. >> >> I would not have worried about the restart, but it was evident that I lost >> my operator state. Either it was my kafka consumer that kept on advancing >> it's offset between a start and the next checkpoint failure ( a minute's >> worth ) or the the operator that had partial aggregates was lost. I have a >> 15 minute window of counts on a keyed operator >> >> I am using ROCKS DB and of course have checkpointing turned on. >> >> The questions thus are >> >> * Should a pipeline be restarted if checkpoint fails ? >> * Why on restart did the operator state did not recreate ? >> * Is the nature of the exception thrown have to do with any of this b'coz >> suspend and resume from a save point work as expected ? >> * And though I am pretty sure, are operators like the Window operator >> stateful by drfault and thus if I have timeWindow(Time.of(window_size, >> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), the >> state is managed by flink ? >> >> Thanks. >> >> >> >> >> >> >> > > >