To add to this, we are assuming that the default configuration will fail a pipeline if a checkpoint fails and will hit the recover loop only and only if the retry limit is not reached
On Thu, Jan 25, 2018 at 7:00 AM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Sorry. > > There are 2 scenerios > > * Idempotent Sinks Use Case where we would want to restore from the > latest valid checkpoint. If I understand the code correctly we try to > retrieve all completed checkpoints for all handles in ZK and abort ( throw > an exception ) if there are handles but no corresponding complete > checkpoints in hdfs, else we use the latest valid checkpoint state. On > abort a restart and thus restore of the pipe is issued repeating the > above execution. If the failure in hdfs was transient a retry will succeed > else when the retry limit is reached the pipeline is aborted for good. > > > * Non Idempotent Sinks where we have no retries. We do not want to recover > from the last available checkpoint as the above code will do as the more > into history we go the more duplicates will be delivered. The only solution > is use exactly once semantics of the source and sinks if possible. > > > > > > > > > On Wed, Jan 24, 2018 at 7:20 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Did you see my second mail? >> >> >> On 24. Jan 2018, at 12:50, Vishal Santoshi <vishal.santo...@gmail.com> >> wrote: >> >> As in, if there are chk handles in zk, there should no reason to start a >> new job ( bad handle, no hdfs connectivity etc ), >> yes that sums it up. >> >> On Wed, Jan 24, 2018 at 5:35 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Wait a sec, I just checked out the code again and it seems we already do >>> that: https://github.com/apache/flink/blob/9071e3befb8c279f7 >>> 3c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apac >>> he/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210 >>> >>> If there were some checkpoints but none could be read we fail recovery. >>> >>> >>> On 24. Jan 2018, at 11:32, Aljoscha Krettek <aljos...@apache.org> wrote: >>> >>> That sounds reasonable: We would keep the first fix, i.e. never delete >>> checkpoints if they're "corrupt", only when they're subsumed. Additionally, >>> we fail the job if there are some checkpoints in ZooKeeper but none of them >>> can be restored to prevent the case where a job starts from scratch even >>> though it shouldn't. >>> >>> Does that sum it up? >>> >>> On 24. Jan 2018, at 01:19, Vishal Santoshi <vishal.santo...@gmail.com> >>> wrote: >>> >>> If we hit the retry limit, abort the job. In our case we will restart >>> from the last SP ( we as any production pile do it is n time s a day ) and >>> that I would think should be OK for most folks ? >>> >>> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> Thank you for considering this. If I understand you correctly. >>>> >>>> * CHK pointer on ZK for a CHK state on hdfs was done successfully. >>>> * Some issue restarted the pipeline. >>>> * The NN was down unfortunately and flink could not retrieve the CHK >>>> state from the CHK pointer on ZK. >>>> >>>> Before >>>> >>>> * The CHK pointer was being removed and the job started from a brand >>>> new slate. >>>> >>>> After ( this fix on 1.4 +) >>>> >>>> * do not delete the CHK pointer ( It has to be subsumed to be deleted >>>> ). >>>> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit >>>> any retry limit ) to restore state >>>> * NN comes back >>>> * Flink restores state on the next retry. >>>> >>>> I would hope that is the sequence to follow. >>>> >>>> Regards. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Hi Vishal, >>>>> >>>>> I think you might be right. We fixed the problem that checkpoints >>>>> where dropped via https://issues.apache.org/jira/browse/FLINK-7783. >>>>> However, we still have the problem that if the DFS is not up at all then >>>>> it >>>>> will look as if the job is starting from scratch. However, the alternative >>>>> is failing the job, in which case you will also never be able to restore >>>>> from a checkpoint. What do you think? >>>>> >>>>> Best, >>>>> Aljoscha >>>>> >>>>> >>>>> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com> wrote: >>>>> >>>>> Sorry for the late reply. >>>>> >>>>> I created FLINK-8487 [1] to track this problem >>>>> >>>>> @Vishal, can you have a look and check if if forgot some details? I >>>>> logged the issue for Flink 1.3.2, is that correct? >>>>> Please add more information if you think it is relevant. >>>>> >>>>> Thanks, >>>>> Fabian >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-8487 >>>>> >>>>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com> >>>>> : >>>>> >>>>>> Or this one >>>>>> >>>>>> https://issues.apache.org/jira/browse/FLINK-4815 >>>>>> >>>>>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> ping. >>>>>>> >>>>>>> This happened again on production and it seems reasonable to >>>>>>> abort when a checkpoint is not found rather than behave as if it is a >>>>>>> brand >>>>>>> new pipeline. >>>>>>> >>>>>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> Folks sorry for being late on this. Can some body with the >>>>>>>> knowledge of this code base create a jira issue for the above ? We have >>>>>>>> seen this more than once on production. >>>>>>>> >>>>>>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek < >>>>>>>> aljos...@apache.org> wrote: >>>>>>>> >>>>>>>>> Hi Vishal, >>>>>>>>> >>>>>>>>> Some relevant Jira issues for you are: >>>>>>>>> >>>>>>>>> - https://issues.apache.org/jira/browse/FLINK-4808: Allow >>>>>>>>> skipping failed checkpoints >>>>>>>>> - https://issues.apache.org/jira/browse/FLINK-4815: Automatic >>>>>>>>> fallback to earlier checkpoint when checkpoint restore fails >>>>>>>>> - https://issues.apache.org/jira/browse/FLINK-7783: Don't always >>>>>>>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover() >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Aljoscha >>>>>>>>> >>>>>>>>> >>>>>>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> Hi Vishal, >>>>>>>>> >>>>>>>>> it would be great if you could create a JIRA ticket with Blocker >>>>>>>>> priority. >>>>>>>>> Please add all relevant information of your detailed analysis, add >>>>>>>>> a link to this email thread (see [1] for the web archive of the >>>>>>>>> mailing >>>>>>>>> list), and post the id of the JIRA issue here. >>>>>>>>> >>>>>>>>> Thanks for looking into this! >>>>>>>>> >>>>>>>>> Best regards, >>>>>>>>> Fabian >>>>>>>>> >>>>>>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org >>>>>>>>> >>>>>>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>> >>>>>>>>>> Thank you for confirming. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I think this is a critical bug. In essence any checkpoint store >>>>>>>>>> ( hdfs/S3/File) will loose state if it is unavailable at resume. >>>>>>>>>> This >>>>>>>>>> becomes all the more painful with your confirming that "failed >>>>>>>>>> checkpoints killing the job" b'coz essentially it mean that if >>>>>>>>>> remote >>>>>>>>>> store in unavailable during checkpoint than you have lost state ( >>>>>>>>>> till of >>>>>>>>>> course you have a retry of none or an unbounded retry delay, a delay >>>>>>>>>> that >>>>>>>>>> you *hope* the store revives in ) .. Remember the first retry >>>>>>>>>> failure will cause new state according the code as written iff the >>>>>>>>>> remote >>>>>>>>>> store is down. We would rather have a configurable property that >>>>>>>>>> establishes our desire to abort something like a >>>>>>>>>> "abort_retry_on_chkretrevalfailure" >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> In our case it is very important that we do not undercount a >>>>>>>>>> window, one reason we use flink and it's awesome failure guarantees, >>>>>>>>>> as >>>>>>>>>> various alarms sound ( we do anomaly detection on the time series ). >>>>>>>>>> >>>>>>>>>> Please create a jira ticket for us to follow or we could do it. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> PS Not aborting on checkpointing, till a configurable limit is >>>>>>>>>> very important too. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek < >>>>>>>>>> aljos...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Vishal, >>>>>>>>>>> >>>>>>>>>>> I think you're right! And thanks for looking into this so >>>>>>>>>>> deeply. >>>>>>>>>>> >>>>>>>>>>> With your last mail your basically saying, that the checkpoint >>>>>>>>>>> could not be restored because your HDFS was temporarily down. If >>>>>>>>>>> Flink had >>>>>>>>>>> not deleted that checkpoint it might have been possible to restore >>>>>>>>>>> it at a >>>>>>>>>>> later point, right? >>>>>>>>>>> >>>>>>>>>>> Regarding failed checkpoints killing the job: yes, this is >>>>>>>>>>> currently the expected behaviour but there are plans to change this. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Aljoscha >>>>>>>>>>> >>>>>>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>> I think this is the offending piece. There is a catch all >>>>>>>>>>> Exception, which IMHO should understand a recoverable exception >>>>>>>>>>> from an >>>>>>>>>>> unrecoverable on. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> try { >>>>>>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch >>>>>>>>>>> eckpointStateHandle); >>>>>>>>>>> if (completedCheckpoint != null) { >>>>>>>>>>> completedCheckpoints.add(completedCheckpoint); >>>>>>>>>>> } >>>>>>>>>>> } catch (Exception e) { >>>>>>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the >>>>>>>>>>> completed " + >>>>>>>>>>> "checkpoint store.", e); >>>>>>>>>>> // remove the checkpoint with broken state handle >>>>>>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1, >>>>>>>>>>> checkpointStateHandle.f0); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> So this is the issue and tell us that it is wrong. ZK had some >>>>>>>>>>>> state ( backed by hdfs ) that referred to a checkpoint ( the same >>>>>>>>>>>> exact >>>>>>>>>>>> last successful checkpoint that was successful before NN screwed >>>>>>>>>>>> us ). When >>>>>>>>>>>> the JM tried to recreate the state and b'coz NN was down failed to >>>>>>>>>>>> retrieve >>>>>>>>>>>> the CHK handle from hdfs and conveniently ( and I think very >>>>>>>>>>>> wrongly ) >>>>>>>>>>>> removed the CHK from being considered and cleaned the pointer ( >>>>>>>>>>>> though >>>>>>>>>>>> failed as was NN was down and is obvious from the dangling file in >>>>>>>>>>>> recovery >>>>>>>>>>>> ) . The metadata itself was on hdfs and failure in retrieving >>>>>>>>>>>> should have >>>>>>>>>>>> been a stop all, not going to trying doing magic exception rather >>>>>>>>>>>> than >>>>>>>>>>>> starting from a blank state. >>>>>>>>>>>> >>>>>>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve >>>>>>>>>>>> checkpoint 44286 from state handle under /0000000000000044286. >>>>>>>>>>>> This indicates that the retrieved state handle is broken. Try >>>>>>>>>>>> cleaning the >>>>>>>>>>>> state handle store. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi < >>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Also note that the zookeeper recovery did ( sadly on the >>>>>>>>>>>>> same hdfs cluster ) also showed the same behavior. It had the >>>>>>>>>>>>> pointers to >>>>>>>>>>>>> the chk point ( I think that is what it does, keeps metadata of >>>>>>>>>>>>> where the >>>>>>>>>>>>> checkpoint etc ) . It too decided to keep the recovery file >>>>>>>>>>>>> from the >>>>>>>>>>>>> failed state. >>>>>>>>>>>>> >>>>>>>>>>>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >>>>>>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >>>>>>>>>>>>> >>>>>>>>>>>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >>>>>>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092 >>>>>>>>>>>>> >>>>>>>>>>>>> This is getting a little interesting. What say you :) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi < >>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Another thing I noted was this thing >>>>>>>>>>>>>> >>>>>>>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>>>>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk >>>>>>>>>>>>>> -44286 >>>>>>>>>>>>>> >>>>>>>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>>>>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk >>>>>>>>>>>>>> -45428 >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Generally what Flink does IMHO is that it replaces the chk >>>>>>>>>>>>>> point directory with a new one. I see it happening now. Every >>>>>>>>>>>>>> minute it >>>>>>>>>>>>>> replaces the old directory. In this job's case however, it did >>>>>>>>>>>>>> not delete >>>>>>>>>>>>>> the 2017-10-04 13:54 and hence the chk-44286 directory. This >>>>>>>>>>>>>> was the last >>>>>>>>>>>>>> chk-44286 ( I think ) successfully created before NN had >>>>>>>>>>>>>> issues but as >>>>>>>>>>>>>> is usual did not delete this chk-44286. It looks as if it >>>>>>>>>>>>>> started with a >>>>>>>>>>>>>> blank slate ???????? Does this strike a chord ????? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi < >>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello Fabian, >>>>>>>>>>>>>>> First of all congratulations on this >>>>>>>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has >>>>>>>>>>>>>>> some natural >>>>>>>>>>>>>>> pluses Flink's state management is far more advanced. With >>>>>>>>>>>>>>> kafka as a >>>>>>>>>>>>>>> source it negates issues GDF has ( GDF integration with pub/sub >>>>>>>>>>>>>>> is organic >>>>>>>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue >>>>>>>>>>>>>>> with windows on >>>>>>>>>>>>>>> event time etc ) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Coming back to this issue. We have that >>>>>>>>>>>>>>> same kafka topic feeding a streaming druid datasource and we do >>>>>>>>>>>>>>> not see any >>>>>>>>>>>>>>> issue there, so so data loss on the source, kafka is not >>>>>>>>>>>>>>> applicable. I am >>>>>>>>>>>>>>> totally certain that the "retention" time was not an issue. >>>>>>>>>>>>>>> It is 4 days of retention and we fixed this issue within 30 >>>>>>>>>>>>>>> minutes. We >>>>>>>>>>>>>>> could replay kafka with a new consumer group.id and that >>>>>>>>>>>>>>> worked fine. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Note these properties and see if they strike a chord. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka >>>>>>>>>>>>>>> consumers is the default true. I bring this up to see whether >>>>>>>>>>>>>>> flink will in >>>>>>>>>>>>>>> any circumstance drive consumption on the kafka perceived >>>>>>>>>>>>>>> offset rather >>>>>>>>>>>>>>> than the one in the checkpoint. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been >>>>>>>>>>>>>>> set. The state is big enough though therefore IMHO no way the >>>>>>>>>>>>>>> state is >>>>>>>>>>>>>>> stored along with the meta data in JM ( or ZK ? ) . The reason >>>>>>>>>>>>>>> I bring this >>>>>>>>>>>>>>> up is to make sure when you say that the size has to be less >>>>>>>>>>>>>>> than 1024bytes >>>>>>>>>>>>>>> , you are talking about cumulative state of the pipeine. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> * We have a good sense of SP ( save point ) and CP ( >>>>>>>>>>>>>>> checkpoint ) and certainly understand that they actually are not >>>>>>>>>>>>>>> dissimilar. However in this case there were multiple attempts >>>>>>>>>>>>>>> to restart >>>>>>>>>>>>>>> the pipe before it finally succeeded. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> * Other hdfs related poperties. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >>>>>>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> state.savepoints.dir: hdfs:///flink-savepoints/<%= >>>>>>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= >>>>>>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Do these make sense ? Is there anything else I should look >>>>>>>>>>>>>>> at. Please also note that it is the second time this has >>>>>>>>>>>>>>> happened. The >>>>>>>>>>>>>>> first time I was vacationing and was not privy to the state of >>>>>>>>>>>>>>> the flink >>>>>>>>>>>>>>> pipeline, but the net effect were similar. The counts for the >>>>>>>>>>>>>>> first window >>>>>>>>>>>>>>> after an internal restart dropped. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thank you for you patience and regards, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Vishal >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske < >>>>>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Vishal, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> window operators are always stateful because the operator >>>>>>>>>>>>>>>> needs to remember previously received events (WindowFunction) >>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>> intermediate results (ReduceFunction). >>>>>>>>>>>>>>>> Given the program you described, a checkpoint should >>>>>>>>>>>>>>>> include the Kafka consumer offset and the state of the window >>>>>>>>>>>>>>>> operator. If >>>>>>>>>>>>>>>> the program eventually successfully (i.e., without an error) >>>>>>>>>>>>>>>> recovered from >>>>>>>>>>>>>>>> the last checkpoint, all its state should have been restored. >>>>>>>>>>>>>>>> Since the >>>>>>>>>>>>>>>> last checkpoint was before HDFS went into safe mode, the >>>>>>>>>>>>>>>> program would have >>>>>>>>>>>>>>>> been reset to that point. If the Kafka retention time is less >>>>>>>>>>>>>>>> than the time >>>>>>>>>>>>>>>> it took to fix HDFS you would have lost data because it would >>>>>>>>>>>>>>>> have been >>>>>>>>>>>>>>>> removed from Kafka. If that's not the case, we need to >>>>>>>>>>>>>>>> investigate this >>>>>>>>>>>>>>>> further because a checkpoint recovery must not result in state >>>>>>>>>>>>>>>> loss. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Restoring from a savepoint is not so much different from >>>>>>>>>>>>>>>> automatic checkpoint recovery. Given that you have a completed >>>>>>>>>>>>>>>> savepoint, >>>>>>>>>>>>>>>> you can restart the job from that point. The main difference >>>>>>>>>>>>>>>> is that >>>>>>>>>>>>>>>> checkpoints are only used for internal recovery and usually >>>>>>>>>>>>>>>> discarded once >>>>>>>>>>>>>>>> the job is terminated while savepoints are retained. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regarding your question if a failed checkpoint should cause >>>>>>>>>>>>>>>> the job to fail and recover I'm not sure what the current >>>>>>>>>>>>>>>> status is. >>>>>>>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint >>>>>>>>>>>>>>>> fails. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi < >>>>>>>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> To add to it, my pipeline is a simple >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> keyBy(0) >>>>>>>>>>>>>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>>>>>>>>>>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>>>>>>>>>>>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hello folks, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> As far as I know checkpoint failure should be ignored and >>>>>>>>>>>>>>>>>> retried with potentially larger state. I had this situation >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>>>>>>>>>>>>>>> * exception was thrown >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache. >>>>>>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE >>>>>>>>>>>>>>>>>> is not supported in state standby. Visit >>>>>>>>>>>>>>>>>> https://s.apache.org/sbnn-error >>>>>>>>>>>>>>>>>> .................. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> at org.apache.flink.runtime.fs.hd >>>>>>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) >>>>>>>>>>>>>>>>>> at org.apache.flink.core.fs.Safet >>>>>>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem. >>>>>>>>>>>>>>>>>> java:111) >>>>>>>>>>>>>>>>>> at org.apache.flink.runtime.state.filesystem. >>>>>>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck >>>>>>>>>>>>>>>>>> pointStreamFactory.java:132) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> * The pipeline came back after a few restarts and >>>>>>>>>>>>>>>>>> checkpoint failures, after the hdfs issues were resolved. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I would not have worried about the restart, but it was >>>>>>>>>>>>>>>>>> evident that I lost my operator state. Either it was my >>>>>>>>>>>>>>>>>> kafka consumer that >>>>>>>>>>>>>>>>>> kept on advancing it's offset between a start and the next >>>>>>>>>>>>>>>>>> checkpoint >>>>>>>>>>>>>>>>>> failure ( a minute's worth ) or the the operator that had >>>>>>>>>>>>>>>>>> partial >>>>>>>>>>>>>>>>>> aggregates was lost. I have a 15 minute window of counts on >>>>>>>>>>>>>>>>>> a keyed operator >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing >>>>>>>>>>>>>>>>>> turned on. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The questions thus are >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>>>>>>>>>>>>>>> * Why on restart did the operator state did not recreate ? >>>>>>>>>>>>>>>>>> * Is the nature of the exception thrown have to do with >>>>>>>>>>>>>>>>>> any of this b'coz suspend and resume from a save point work >>>>>>>>>>>>>>>>>> as expected ? >>>>>>>>>>>>>>>>>> * And though I am pretty sure, are operators like the >>>>>>>>>>>>>>>>>> Window operator stateful by drfault and thus if I have >>>>>>>>>>>>>>>>>> timeWindow(Time.of(window_size, >>>>>>>>>>>>>>>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new >>>>>>>>>>>>>>>>>> WindowFunction()), the >>>>>>>>>>>>>>>>>> state is managed by flink ? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >>> >> >> >