As in, if there are chk handles in zk, there should no reason to start a new job ( bad handle, no hdfs connectivity etc ), yes that sums it up.
On Wed, Jan 24, 2018 at 5:35 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Wait a sec, I just checked out the code again and it seems we already do > that: https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bdd > c0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/ > runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210 > > If there were some checkpoints but none could be read we fail recovery. > > > On 24. Jan 2018, at 11:32, Aljoscha Krettek <aljos...@apache.org> wrote: > > That sounds reasonable: We would keep the first fix, i.e. never delete > checkpoints if they're "corrupt", only when they're subsumed. Additionally, > we fail the job if there are some checkpoints in ZooKeeper but none of them > can be restored to prevent the case where a job starts from scratch even > though it shouldn't. > > Does that sum it up? > > On 24. Jan 2018, at 01:19, Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > > If we hit the retry limit, abort the job. In our case we will restart from > the last SP ( we as any production pile do it is n time s a day ) and that > I would think should be OK for most folks ? > > On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> Thank you for considering this. If I understand you correctly. >> >> * CHK pointer on ZK for a CHK state on hdfs was done successfully. >> * Some issue restarted the pipeline. >> * The NN was down unfortunately and flink could not retrieve the CHK >> state from the CHK pointer on ZK. >> >> Before >> >> * The CHK pointer was being removed and the job started from a brand new >> slate. >> >> After ( this fix on 1.4 +) >> >> * do not delete the CHK pointer ( It has to be subsumed to be deleted ). >> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit >> any retry limit ) to restore state >> * NN comes back >> * Flink restores state on the next retry. >> >> I would hope that is the sequence to follow. >> >> Regards. >> >> >> >> >> >> >> >> >> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi Vishal, >>> >>> I think you might be right. We fixed the problem that checkpoints where >>> dropped via https://issues.apache.org/jira/browse/FLINK-7783. However, >>> we still have the problem that if the DFS is not up at all then it will >>> look as if the job is starting from scratch. However, the alternative is >>> failing the job, in which case you will also never be able to restore from >>> a checkpoint. What do you think? >>> >>> Best, >>> Aljoscha >>> >>> >>> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>> Sorry for the late reply. >>> >>> I created FLINK-8487 [1] to track this problem >>> >>> @Vishal, can you have a look and check if if forgot some details? I >>> logged the issue for Flink 1.3.2, is that correct? >>> Please add more information if you think it is relevant. >>> >>> Thanks, >>> Fabian >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-8487 >>> >>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>> >>>> Or this one >>>> >>>> https://issues.apache.org/jira/browse/FLINK-4815 >>>> >>>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> ping. >>>>> >>>>> This happened again on production and it seems reasonable to abort >>>>> when a checkpoint is not found rather than behave as if it is a brand new >>>>> pipeline. >>>>> >>>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> Folks sorry for being late on this. Can some body with the knowledge >>>>>> of this code base create a jira issue for the above ? We have seen this >>>>>> more than once on production. >>>>>> >>>>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek < >>>>>> aljos...@apache.org> wrote: >>>>>> >>>>>>> Hi Vishal, >>>>>>> >>>>>>> Some relevant Jira issues for you are: >>>>>>> >>>>>>> - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping >>>>>>> failed checkpoints >>>>>>> - https://issues.apache.org/jira/browse/FLINK-4815: Automatic >>>>>>> fallback to earlier checkpoint when checkpoint restore fails >>>>>>> - https://issues.apache.org/jira/browse/FLINK-7783: Don't always >>>>>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover() >>>>>>> >>>>>>> Best, >>>>>>> Aljoscha >>>>>>> >>>>>>> >>>>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote: >>>>>>> >>>>>>> Hi Vishal, >>>>>>> >>>>>>> it would be great if you could create a JIRA ticket with Blocker >>>>>>> priority. >>>>>>> Please add all relevant information of your detailed analysis, add a >>>>>>> link to this email thread (see [1] for the web archive of the mailing >>>>>>> list), and post the id of the JIRA issue here. >>>>>>> >>>>>>> Thanks for looking into this! >>>>>>> >>>>>>> Best regards, >>>>>>> Fabian >>>>>>> >>>>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org >>>>>>> >>>>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com>: >>>>>>> >>>>>>>> Thank you for confirming. >>>>>>>> >>>>>>>> >>>>>>>> I think this is a critical bug. In essence any checkpoint store ( >>>>>>>> hdfs/S3/File) will loose state if it is unavailable at resume. This >>>>>>>> becomes all the more painful with your confirming that "failed >>>>>>>> checkpoints killing the job" b'coz essentially it mean that if remote >>>>>>>> store in unavailable during checkpoint than you have lost state ( >>>>>>>> till of >>>>>>>> course you have a retry of none or an unbounded retry delay, a delay >>>>>>>> that >>>>>>>> you *hope* the store revives in ) .. Remember the first retry >>>>>>>> failure will cause new state according the code as written iff the >>>>>>>> remote >>>>>>>> store is down. We would rather have a configurable property that >>>>>>>> establishes our desire to abort something like a >>>>>>>> "abort_retry_on_chkretrevalfailure" >>>>>>>> >>>>>>>> >>>>>>>> In our case it is very important that we do not undercount a >>>>>>>> window, one reason we use flink and it's awesome failure guarantees, as >>>>>>>> various alarms sound ( we do anomaly detection on the time series ). >>>>>>>> >>>>>>>> Please create a jira ticket for us to follow or we could do it. >>>>>>>> >>>>>>>> >>>>>>>> PS Not aborting on checkpointing, till a configurable limit is very >>>>>>>> important too. >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek < >>>>>>>> aljos...@apache.org> wrote: >>>>>>>> >>>>>>>>> Hi Vishal, >>>>>>>>> >>>>>>>>> I think you're right! And thanks for looking into this so deeply. >>>>>>>>> >>>>>>>>> With your last mail your basically saying, that the checkpoint >>>>>>>>> could not be restored because your HDFS was temporarily down. If >>>>>>>>> Flink had >>>>>>>>> not deleted that checkpoint it might have been possible to restore it >>>>>>>>> at a >>>>>>>>> later point, right? >>>>>>>>> >>>>>>>>> Regarding failed checkpoints killing the job: yes, this is >>>>>>>>> currently the expected behaviour but there are plans to change this. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Aljoscha >>>>>>>>> >>>>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> I think this is the offending piece. There is a catch all >>>>>>>>> Exception, which IMHO should understand a recoverable exception from >>>>>>>>> an >>>>>>>>> unrecoverable on. >>>>>>>>> >>>>>>>>> >>>>>>>>> try { >>>>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch >>>>>>>>> eckpointStateHandle); >>>>>>>>> if (completedCheckpoint != null) { >>>>>>>>> completedCheckpoints.add(completedCheckpoint); >>>>>>>>> } >>>>>>>>> } catch (Exception e) { >>>>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the >>>>>>>>> completed " + >>>>>>>>> "checkpoint store.", e); >>>>>>>>> // remove the checkpoint with broken state handle >>>>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1, >>>>>>>>> checkpointStateHandle.f0); >>>>>>>>> } >>>>>>>>> >>>>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> So this is the issue and tell us that it is wrong. ZK had some >>>>>>>>>> state ( backed by hdfs ) that referred to a checkpoint ( the same >>>>>>>>>> exact >>>>>>>>>> last successful checkpoint that was successful before NN screwed us >>>>>>>>>> ). When >>>>>>>>>> the JM tried to recreate the state and b'coz NN was down failed to >>>>>>>>>> retrieve >>>>>>>>>> the CHK handle from hdfs and conveniently ( and I think very wrongly >>>>>>>>>> ) >>>>>>>>>> removed the CHK from being considered and cleaned the pointer ( >>>>>>>>>> though >>>>>>>>>> failed as was NN was down and is obvious from the dangling file in >>>>>>>>>> recovery >>>>>>>>>> ) . The metadata itself was on hdfs and failure in retrieving should >>>>>>>>>> have >>>>>>>>>> been a stop all, not going to trying doing magic exception rather >>>>>>>>>> than >>>>>>>>>> starting from a blank state. >>>>>>>>>> >>>>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve >>>>>>>>>> checkpoint 44286 from state handle under /0000000000000044286. >>>>>>>>>> This indicates that the retrieved state handle is broken. Try >>>>>>>>>> cleaning the >>>>>>>>>> state handle store. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Also note that the zookeeper recovery did ( sadly on the same >>>>>>>>>>> hdfs cluster ) also showed the same behavior. It had the pointers >>>>>>>>>>> to the >>>>>>>>>>> chk point ( I think that is what it does, keeps metadata of where >>>>>>>>>>> the >>>>>>>>>>> checkpoint etc ) . It too decided to keep the recovery file from >>>>>>>>>>> the >>>>>>>>>>> failed state. >>>>>>>>>>> >>>>>>>>>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >>>>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >>>>>>>>>>> >>>>>>>>>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >>>>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092 >>>>>>>>>>> >>>>>>>>>>> This is getting a little interesting. What say you :) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Another thing I noted was this thing >>>>>>>>>>>> >>>>>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk >>>>>>>>>>>> -44286 >>>>>>>>>>>> >>>>>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk >>>>>>>>>>>> -45428 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Generally what Flink does IMHO is that it replaces the chk >>>>>>>>>>>> point directory with a new one. I see it happening now. Every >>>>>>>>>>>> minute it >>>>>>>>>>>> replaces the old directory. In this job's case however, it did >>>>>>>>>>>> not delete >>>>>>>>>>>> the 2017-10-04 13:54 and hence the chk-44286 directory. This was >>>>>>>>>>>> the last >>>>>>>>>>>> chk-44286 ( I think ) successfully created before NN had issues >>>>>>>>>>>> but as >>>>>>>>>>>> is usual did not delete this chk-44286. It looks as if it started >>>>>>>>>>>> with a >>>>>>>>>>>> blank slate ???????? Does this strike a chord ????? >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi < >>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello Fabian, >>>>>>>>>>>>> First of all congratulations on this >>>>>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has >>>>>>>>>>>>> some natural >>>>>>>>>>>>> pluses Flink's state management is far more advanced. With kafka >>>>>>>>>>>>> as a >>>>>>>>>>>>> source it negates issues GDF has ( GDF integration with pub/sub >>>>>>>>>>>>> is organic >>>>>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue with >>>>>>>>>>>>> windows on >>>>>>>>>>>>> event time etc ) >>>>>>>>>>>>> >>>>>>>>>>>>> Coming back to this issue. We have that >>>>>>>>>>>>> same kafka topic feeding a streaming druid datasource and we do >>>>>>>>>>>>> not see any >>>>>>>>>>>>> issue there, so so data loss on the source, kafka is not >>>>>>>>>>>>> applicable. I am >>>>>>>>>>>>> totally certain that the "retention" time was not an issue. >>>>>>>>>>>>> It is 4 days of retention and we fixed this issue within 30 >>>>>>>>>>>>> minutes. We >>>>>>>>>>>>> could replay kafka with a new consumer group.id and that >>>>>>>>>>>>> worked fine. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Note these properties and see if they strike a chord. >>>>>>>>>>>>> >>>>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka >>>>>>>>>>>>> consumers is the default true. I bring this up to see whether >>>>>>>>>>>>> flink will in >>>>>>>>>>>>> any circumstance drive consumption on the kafka perceived offset >>>>>>>>>>>>> rather >>>>>>>>>>>>> than the one in the checkpoint. >>>>>>>>>>>>> >>>>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set. >>>>>>>>>>>>> The state is big enough though therefore IMHO no way the state is >>>>>>>>>>>>> stored >>>>>>>>>>>>> along with the meta data in JM ( or ZK ? ) . The reason I bring >>>>>>>>>>>>> this up is >>>>>>>>>>>>> to make sure when you say that the size has to be less than >>>>>>>>>>>>> 1024bytes , you >>>>>>>>>>>>> are talking about cumulative state of the pipeine. >>>>>>>>>>>>> >>>>>>>>>>>>> * We have a good sense of SP ( save point ) and CP ( >>>>>>>>>>>>> checkpoint ) and certainly understand that they actually are not >>>>>>>>>>>>> dissimilar. However in this case there were multiple attempts to >>>>>>>>>>>>> restart >>>>>>>>>>>>> the pipe before it finally succeeded. >>>>>>>>>>>>> >>>>>>>>>>>>> * Other hdfs related poperties. >>>>>>>>>>>>> >>>>>>>>>>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >>>>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>>>> >>>>>>>>>>>>> state.savepoints.dir: hdfs:///flink-savepoints/<%= >>>>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>>>> >>>>>>>>>>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= >>>>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Do these make sense ? Is there anything else I should look >>>>>>>>>>>>> at. Please also note that it is the second time this has >>>>>>>>>>>>> happened. The >>>>>>>>>>>>> first time I was vacationing and was not privy to the state of >>>>>>>>>>>>> the flink >>>>>>>>>>>>> pipeline, but the net effect were similar. The counts for the >>>>>>>>>>>>> first window >>>>>>>>>>>>> after an internal restart dropped. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Thank you for you patience and regards, >>>>>>>>>>>>> >>>>>>>>>>>>> Vishal >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske < >>>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Vishal, >>>>>>>>>>>>>> >>>>>>>>>>>>>> window operators are always stateful because the operator >>>>>>>>>>>>>> needs to remember previously received events (WindowFunction) or >>>>>>>>>>>>>> intermediate results (ReduceFunction). >>>>>>>>>>>>>> Given the program you described, a checkpoint should include >>>>>>>>>>>>>> the Kafka consumer offset and the state of the window operator. >>>>>>>>>>>>>> If the >>>>>>>>>>>>>> program eventually successfully (i.e., without an error) >>>>>>>>>>>>>> recovered from the >>>>>>>>>>>>>> last checkpoint, all its state should have been restored. Since >>>>>>>>>>>>>> the last >>>>>>>>>>>>>> checkpoint was before HDFS went into safe mode, the program >>>>>>>>>>>>>> would have been >>>>>>>>>>>>>> reset to that point. If the Kafka retention time is less than >>>>>>>>>>>>>> the time it >>>>>>>>>>>>>> took to fix HDFS you would have lost data because it would have >>>>>>>>>>>>>> been >>>>>>>>>>>>>> removed from Kafka. If that's not the case, we need to >>>>>>>>>>>>>> investigate this >>>>>>>>>>>>>> further because a checkpoint recovery must not result in state >>>>>>>>>>>>>> loss. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Restoring from a savepoint is not so much different from >>>>>>>>>>>>>> automatic checkpoint recovery. Given that you have a completed >>>>>>>>>>>>>> savepoint, >>>>>>>>>>>>>> you can restart the job from that point. The main difference is >>>>>>>>>>>>>> that >>>>>>>>>>>>>> checkpoints are only used for internal recovery and usually >>>>>>>>>>>>>> discarded once >>>>>>>>>>>>>> the job is terminated while savepoints are retained. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regarding your question if a failed checkpoint should cause >>>>>>>>>>>>>> the job to fail and recover I'm not sure what the current status >>>>>>>>>>>>>> is. >>>>>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi < >>>>>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> To add to it, my pipeline is a simple >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> keyBy(0) >>>>>>>>>>>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>>>>>>>>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>>>>>>>>>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello folks, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> As far as I know checkpoint failure should be ignored and >>>>>>>>>>>>>>>> retried with potentially larger state. I had this situation >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>>>>>>>>>>>>> * exception was thrown >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache. >>>>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE is >>>>>>>>>>>>>>>> not supported in state standby. Visit >>>>>>>>>>>>>>>> https://s.apache.org/sbnn-error >>>>>>>>>>>>>>>> .................. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> at org.apache.flink.runtime.fs.hd >>>>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) >>>>>>>>>>>>>>>> at org.apache.flink.core.fs.Safet >>>>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem. >>>>>>>>>>>>>>>> java:111) >>>>>>>>>>>>>>>> at org.apache.flink.runtime.state.filesystem. >>>>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck >>>>>>>>>>>>>>>> pointStreamFactory.java:132) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> * The pipeline came back after a few restarts and >>>>>>>>>>>>>>>> checkpoint failures, after the hdfs issues were resolved. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I would not have worried about the restart, but it was >>>>>>>>>>>>>>>> evident that I lost my operator state. Either it was my kafka >>>>>>>>>>>>>>>> consumer that >>>>>>>>>>>>>>>> kept on advancing it's offset between a start and the next >>>>>>>>>>>>>>>> checkpoint >>>>>>>>>>>>>>>> failure ( a minute's worth ) or the the operator that had >>>>>>>>>>>>>>>> partial >>>>>>>>>>>>>>>> aggregates was lost. I have a 15 minute window of counts on a >>>>>>>>>>>>>>>> keyed operator >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned >>>>>>>>>>>>>>>> on. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The questions thus are >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>>>>>>>>>>>>> * Why on restart did the operator state did not recreate ? >>>>>>>>>>>>>>>> * Is the nature of the exception thrown have to do with any >>>>>>>>>>>>>>>> of this b'coz suspend and resume from a save point work as >>>>>>>>>>>>>>>> expected ? >>>>>>>>>>>>>>>> * And though I am pretty sure, are operators like the >>>>>>>>>>>>>>>> Window operator stateful by drfault and thus if I have >>>>>>>>>>>>>>>> timeWindow(Time.of(window_size, >>>>>>>>>>>>>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new >>>>>>>>>>>>>>>> WindowFunction()), the >>>>>>>>>>>>>>>> state is managed by flink ? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> > > >