Did you see my second mail?

> On 24. Jan 2018, at 12:50, Vishal Santoshi <vishal.santo...@gmail.com> wrote:
> 
> As in, if there are chk handles in zk, there should no reason to start a new 
> job ( bad handle, no hdfs connectivity etc ),
>  yes that sums it up.
> 
> On Wed, Jan 24, 2018 at 5:35 AM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Wait a sec, I just checked out the code again and it seems we already do 
> that: 
> https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210
>  
> <https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210>
> 
> If there were some checkpoints but none could be read we fail recovery.
> 
> 
>> On 24. Jan 2018, at 11:32, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> 
>> That sounds reasonable: We would keep the first fix, i.e. never delete 
>> checkpoints if they're "corrupt", only when they're subsumed. Additionally, 
>> we fail the job if there are some checkpoints in ZooKeeper but none of them 
>> can be restored to prevent the case where a job starts from scratch even 
>> though it shouldn't.
>> 
>> Does that sum it up?
>> 
>>> On 24. Jan 2018, at 01:19, Vishal Santoshi <vishal.santo...@gmail.com 
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>> 
>>> If we hit the retry limit, abort the job. In our case we will restart from 
>>> the last SP ( we as any production pile do it is n time s a day )  and that 
>>> I would think should be OK for most folks ?
>>> 
>>> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi 
>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>> Thank you for considering this. If I understand you correctly.
>>> 
>>> * CHK pointer on ZK for a CHK state on hdfs was done successfully.
>>> * Some issue restarted the pipeline.
>>> * The NN was down unfortunately and flink could not retrieve the  CHK state 
>>> from the CHK pointer on ZK.
>>> 
>>> Before 
>>> 
>>> * The CHK pointer was being removed and the job started from a brand new 
>>> slate.
>>> 
>>> After ( this fix on 1.4 +) 
>>> 
>>> * do not delete the CHK pointer ( It has to be subsumed to be deleted ). 
>>> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit any 
>>> retry limit ) to restore state
>>> * NN comes back 
>>> * Flink restores state on the next retry.
>>> 
>>> I would hope that is the sequence to follow.
>>> 
>>> Regards.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljos...@apache.org 
>>> <mailto:aljos...@apache.org>> wrote:
>>> Hi Vishal,
>>> 
>>> I think you might be right. We fixed the problem that checkpoints where 
>>> dropped via https://issues.apache.org/jira/browse/FLINK-7783 
>>> <https://issues.apache.org/jira/browse/FLINK-7783>. However, we still have 
>>> the problem that if the DFS is not up at all then it will look as if the 
>>> job is starting from scratch. However, the alternative is failing the job, 
>>> in which case you will also never be able to restore from a checkpoint. 
>>> What do you think?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>> 
>>>> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com 
>>>> <mailto:fhue...@gmail.com>> wrote:
>>>> 
>>>> Sorry for the late reply.
>>>> 
>>>> I created FLINK-8487 [1] to track this problem
>>>> 
>>>> @Vishal, can you have a look and check if if forgot some details? I logged 
>>>> the issue for Flink 1.3.2, is that correct?
>>>> Please add more information if you think it is relevant.
>>>> 
>>>> Thanks,
>>>> Fabian
>>>> 
>>>> [1] https://issues.apache.org/jira/browse/FLINK-8487 
>>>> <https://issues.apache.org/jira/browse/FLINK-8487>
>>>> 
>>>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com 
>>>> <mailto:vishal.santo...@gmail.com>>:
>>>> Or this one 
>>>> 
>>>> https://issues.apache.org/jira/browse/FLINK-4815 
>>>> <https://issues.apache.org/jira/browse/FLINK-4815>
>>>> 
>>>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi 
>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>> ping. 
>>>> 
>>>>     This happened again on production and it seems reasonable to abort 
>>>> when a checkpoint is not found rather than behave as if it is a brand new 
>>>> pipeline.  
>>>> 
>>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi 
>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>> Folks sorry for being late on this. Can some body with the knowledge of 
>>>> this code base create a jira issue for the above ? We have seen this more 
>>>> than once on production.
>>>> 
>>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org 
>>>> <mailto:aljos...@apache.org>> wrote:
>>>> Hi Vishal,
>>>> 
>>>> Some relevant Jira issues for you are:
>>>> 
>>>>  - https://issues.apache.org/jira/browse/FLINK-4808: 
>>>> <https://issues.apache.org/jira/browse/FLINK-4808:> Allow skipping failed 
>>>> checkpoints
>>>>  - https://issues.apache.org/jira/browse/FLINK-4815: 
>>>> <https://issues.apache.org/jira/browse/FLINK-4815:> Automatic fallback to 
>>>> earlier checkpoint when checkpoint restore fails
>>>>  - https://issues.apache.org/jira/browse/FLINK-7783: 
>>>> <https://issues.apache.org/jira/browse/FLINK-7783:> Don't always remove 
>>>> checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>> 
>>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com 
>>>>> <mailto:fhue...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Vishal,
>>>>> 
>>>>> it would be great if you could create a JIRA ticket with Blocker priority.
>>>>> Please add all relevant information of your detailed analysis, add a link 
>>>>> to this email thread (see [1] for the web archive of the mailing list), 
>>>>> and post the id of the JIRA issue here.
>>>>> 
>>>>> Thanks for looking into this!
>>>>> 
>>>>> Best regards,
>>>>> Fabian
>>>>> 
>>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org 
>>>>> <https://lists.apache.org/list.html?user@flink.apache.org>
>>>>> 
>>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com 
>>>>> <mailto:vishal.santo...@gmail.com>>:
>>>>> Thank you for confirming. 
>>>>>        
>>>>> 
>>>>>  I think this is a critical bug. In essence any checkpoint store ( 
>>>>> hdfs/S3/File)  will loose state if it is unavailable at resume. This 
>>>>> becomes all the more painful with your confirming that  "failed 
>>>>> checkpoints killing the job"  b'coz essentially it mean that if remote 
>>>>> store in unavailable  during checkpoint than you have lost state ( till 
>>>>> of course you have a retry of none or an unbounded retry delay, a delay 
>>>>> that you hope the store revives in ) .. Remember  the first retry failure 
>>>>>  will cause new state according the code as written iff the remote store 
>>>>> is down. We would rather have a configurable property that establishes  
>>>>> our desire to abort something like a "abort_retry_on_chkretrevalfailure"
>>>>> 
>>>>> 
>>>>> In our case it is very important that we do not undercount a window, one 
>>>>> reason we use flink and it's awesome failure guarantees, as various 
>>>>> alarms sound ( we do anomaly detection on the time series ).
>>>>> 
>>>>> Please create a jira ticket for us to follow or we could do it.
>>>>> 
>>>>> 
>>>>> PS Not aborting on checkpointing, till a configurable limit is very 
>>>>> important too.
>>>>> 
>>>>> 
>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org 
>>>>> <mailto:aljos...@apache.org>> wrote:
>>>>> Hi Vishal,
>>>>> 
>>>>> I think you're right! And thanks for looking into this so deeply. 
>>>>> 
>>>>> With your last mail your basically saying, that the checkpoint could not 
>>>>> be restored because your HDFS was temporarily down. If Flink had not 
>>>>> deleted that checkpoint it might have been possible to restore it at a 
>>>>> later point, right?
>>>>> 
>>>>> Regarding failed checkpoints killing the job: yes, this is currently the 
>>>>> expected behaviour but there are plans to change this.
>>>>> 
>>>>> Best,
>>>>> Aljoscha
>>>>> 
>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com 
>>>>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>>>>> 
>>>>>> I think this is the offending piece. There is a catch all Exception, 
>>>>>> which IMHO should understand a recoverable exception from an 
>>>>>> unrecoverable on. 
>>>>>> 
>>>>>> 
>>>>>>                  try {
>>>>>>                          completedCheckpoint = 
>>>>>> retrieveCompletedCheckpoint(checkpointStateHandle);
>>>>>>                          if (completedCheckpoint != null) {
>>>>>>                                  
>>>>>> completedCheckpoints.add(completedCheckpoint);
>>>>>>                          }
>>>>>>                  } catch (Exception e) {
>>>>>>                          LOG.warn("Could not retrieve checkpoint. 
>>>>>> Removing it from the completed " +
>>>>>>                                  "checkpoint store.", e);
>>>>>> 
>>>>>>                          // remove the checkpoint with broken state 
>>>>>> handle
>>>>>>                          
>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1, 
>>>>>> checkpointStateHandle.f0);
>>>>>>                  }
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi 
>>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>>> So this is the issue and tell us that it is wrong. ZK had some state ( 
>>>>>> backed by hdfs )  that referred to a checkpoint ( the same exact last 
>>>>>> successful checkpoint that was successful before NN screwed us ). When 
>>>>>> the JM tried to recreate the state and b'coz NN was down failed to 
>>>>>> retrieve the CHK handle from hdfs  and conveniently ( and I think very  
>>>>>> wrongly ) removed the CHK from being considered and cleaned the pointer 
>>>>>> ( though failed as was NN was down and is obvious from the dangling file 
>>>>>> in recovery ) . The metadata itself was on hdfs and failure in 
>>>>>> retrieving should have been a stop all, not going to trying doing magic 
>>>>>> exception rather than starting from a blank state.
>>>>>> 
>>>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint 
>>>>>> 44286 from state handle under /0000000000000044286. This indicates that 
>>>>>> the retrieved state handle is broken. Try cleaning the state handle 
>>>>>> store.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi 
>>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>>> Also note that  the zookeeper recovery did  ( sadly on the same hdfs 
>>>>>> cluster ) also showed the same behavior. It had the pointers to the chk 
>>>>>> point  ( I  think that is what it does, keeps metadata of where the 
>>>>>> checkpoint etc  ) .  It too decided to keep the recovery file from the 
>>>>>> failed state.
>>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55 
>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>> 
>>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07 
>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>> 
>>>>>> This is getting a little interesting. What say you :)
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi 
>>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>>> Another thing I noted was this thing
>>>>>> 
>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54 
>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>>>>>> 
>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15 
>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Generally what Flink does IMHO is that it replaces the chk point 
>>>>>> directory with a new one. I see it happening now. Every minute it 
>>>>>> replaces the old directory.  In this job's case however, it did not 
>>>>>> delete the 2017-10-04 13:54  and hence the chk-44286 directory.  This 
>>>>>> was the last chk-44286 (  I think  )  successfully created before NN had 
>>>>>> issues but as is usual did not delete this  chk-44286. It looks as if it 
>>>>>> started with a blank slate ???????? Does this strike a chord ?????
>>>>>> 
>>>>>> 
>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi 
>>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>>> Hello Fabian, 
>>>>>>                       First of all congratulations on this fabulous 
>>>>>> framework. I have worked with GDF and though GDF has some natural pluses 
>>>>>> Flink's state management is far more advanced. With kafka as a source it 
>>>>>> negates issues GDF has ( GDF integration with pub/sub is organic and 
>>>>>> that is to be expected but non FIFO pub/sub is an issue with windows on 
>>>>>> event time etc )
>>>>>> 
>>>>>>                    Coming back to this issue. We have that same kafka 
>>>>>> topic feeding a streaming druid datasource and we do not see any issue 
>>>>>> there, so so data loss on the source, kafka is not applicable. I am 
>>>>>> totally certain that the "retention" time was not an issue. It is 4 days 
>>>>>> of retention and we fixed this issue within 30 minutes. We could replay 
>>>>>> kafka with a new consumer group.id <http://group.id/> and that worked 
>>>>>> fine. 
>>>>>> 
>>>>>> 
>>>>>> Note these properties and see if they strike a chord.
>>>>>> 
>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is the 
>>>>>> default true. I bring this up to see whether flink will in any 
>>>>>> circumstance drive consumption on the kafka perceived offset rather than 
>>>>>> the one in the checkpoint.
>>>>>> 
>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set.  The state 
>>>>>> is big enough though therefore IMHO no way the state is stored along 
>>>>>> with the meta data in JM ( or ZK ? ) . The reason I bring this up is to 
>>>>>> make sure when you say that the size has to be less than 1024bytes , you 
>>>>>> are talking about cumulative state of the pipeine.
>>>>>> 
>>>>>> * We have a good sense of SP ( save point )  and CP ( checkpoint ) and 
>>>>>> certainly understand that they actually are not dissimilar. However in 
>>>>>> this case there were multiple attempts to restart the pipe before it 
>>>>>> finally succeeded. 
>>>>>> 
>>>>>> * Other hdfs related poperties.
>>>>>>  
>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints <>/<%= 
>>>>>> flink_hdfs_root %>
>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints <>/<%= flink_hdfs_root %>
>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery <>/<%= 
>>>>>> flink_hdfs_root %>
>>>>>> 
>>>>>> 
>>>>>> Do these make sense ? Is there anything else I should look at.  Please 
>>>>>> also note that it is the second time this has happened. The first time I 
>>>>>> was vacationing and was not privy to the state of the flink pipeline, 
>>>>>> but the net effect were similar. The counts for the first window after 
>>>>>> an internal restart dropped. 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Thank you for you patience and regards,
>>>>>> 
>>>>>> Vishal
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com 
>>>>>> <mailto:fhue...@gmail.com>> wrote:
>>>>>> Hi Vishal,
>>>>>> 
>>>>>> window operators are always stateful because the operator needs to 
>>>>>> remember previously received events (WindowFunction) or intermediate 
>>>>>> results (ReduceFunction).
>>>>>> Given the program you described, a checkpoint should include the Kafka 
>>>>>> consumer offset and the state of the window operator. If the program 
>>>>>> eventually successfully (i.e., without an error) recovered from the last 
>>>>>> checkpoint, all its state should have been restored. Since the last 
>>>>>> checkpoint was before HDFS went into safe mode, the program would have 
>>>>>> been reset to that point. If the Kafka retention time is less than the 
>>>>>> time it took to fix HDFS you would have lost data because it would have 
>>>>>> been removed from Kafka. If that's not the case, we need to investigate 
>>>>>> this further because a checkpoint recovery must not result in state loss.
>>>>>> 
>>>>>> Restoring from a savepoint is not so much different from automatic 
>>>>>> checkpoint recovery. Given that you have a completed savepoint, you can 
>>>>>> restart the job from that point. The main difference is that checkpoints 
>>>>>> are only used for internal recovery and usually discarded once the job 
>>>>>> is terminated while savepoints are retained. 
>>>>>> 
>>>>>> Regarding your question if a failed checkpoint should cause the job to 
>>>>>> fail and recover I'm not sure what the current status is.
>>>>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>>>>> 
>>>>>> Best, Fabian
>>>>>> 
>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com 
>>>>>> <mailto:vishal.santo...@gmail.com>>:
>>>>>> To add to it, my pipeline is a simple 
>>>>>> 
>>>>>> keyBy(0)
>>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>>> 
>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi 
>>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>>> Hello folks,
>>>>>> 
>>>>>> As far as I know checkpoint failure should be ignored and retried with 
>>>>>> potentially larger state. I had this situation
>>>>>> 
>>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>>> * exception was thrown
>>>>>> 
>>>>>>     
>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>>>>  Operation category WRITE is not supported in state standby. Visit 
>>>>>> https://s.apache.org/sbnn-error <https://s.apache.org/sbnn-error>
>>>>>>     ..................
>>>>>> 
>>>>>>     at 
>>>>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>>>>         at 
>>>>>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
>>>>>>         at 
>>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
>>>>>> 
>>>>>> * The pipeline came back after a few restarts and checkpoint failures, 
>>>>>> after the hdfs issues were resolved.
>>>>>> 
>>>>>> I would not have worried about the restart, but it was evident that I 
>>>>>> lost my operator state. Either it was my kafka consumer that kept on 
>>>>>> advancing it's offset between a start and the next checkpoint failure ( 
>>>>>> a minute's worth ) or the the operator that had partial aggregates was 
>>>>>> lost. I have a 15 minute window of counts on a keyed operator
>>>>>> 
>>>>>> I am using ROCKS DB and of course have checkpointing turned on.
>>>>>> 
>>>>>> The questions thus are
>>>>>> 
>>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>>> * Why on restart did the operator state did not recreate ?
>>>>>> * Is the nature of the exception thrown have to do with any of this 
>>>>>> b'coz suspend and resume from a save point work as expected ?
>>>>>> * And though I am pretty sure, are operators like the Window operator 
>>>>>> stateful by drfault and thus if I have timeWindow(Time.of(window_size, 
>>>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), 
>>>>>> the state is managed by flink ?
>>>>>> 
>>>>>> Thanks.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
>> 
> 
> 

Reply via email to