Hi Vishal,

Some relevant Jira issues for you are:

 - https://issues.apache.org/jira/browse/FLINK-4808: 
<https://issues.apache.org/jira/browse/FLINK-4808:> Allow skipping failed 
checkpoints
 - https://issues.apache.org/jira/browse/FLINK-4815: 
<https://issues.apache.org/jira/browse/FLINK-4815:> Automatic fallback to 
earlier checkpoint when checkpoint restore fails
 - https://issues.apache.org/jira/browse/FLINK-7783: 
<https://issues.apache.org/jira/browse/FLINK-7783:> Don't always remove 
checkpoints in ZooKeeperCompletedCheckpointStore#recover()

Best,
Aljoscha

> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Vishal,
> 
> it would be great if you could create a JIRA ticket with Blocker priority.
> Please add all relevant information of your detailed analysis, add a link to 
> this email thread (see [1] for the web archive of the mailing list), and post 
> the id of the JIRA issue here.
> 
> Thanks for looking into this!
> 
> Best regards,
> Fabian
> 
> [1] https://lists.apache.org/list.html?user@flink.apache.org 
> <https://lists.apache.org/list.html?user@flink.apache.org>
> 
> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com 
> <mailto:vishal.santo...@gmail.com>>:
> Thank you for confirming. 
>        
> 
>  I think this is a critical bug. In essence any checkpoint store ( 
> hdfs/S3/File)  will loose state if it is unavailable at resume. This becomes 
> all the more painful with your confirming that  "failed checkpoints killing 
> the job"  b'coz essentially it mean that if remote store in unavailable  
> during checkpoint than you have lost state ( till of course you have a retry 
> of none or an unbounded retry delay, a delay that you hope the store revives 
> in ) .. Remember  the first retry failure  will cause new state according the 
> code as written iff the remote store is down. We would rather have a 
> configurable property that establishes  our desire to abort something like a 
> "abort_retry_on_chkretrevalfailure"
> 
> 
> In our case it is very important that we do not undercount a window, one 
> reason we use flink and it's awesome failure guarantees, as various alarms 
> sound ( we do anomaly detection on the time series ).
> 
> Please create a jira ticket for us to follow or we could do it.
> 
> 
> PS Not aborting on checkpointing, till a configurable limit is very important 
> too.
> 
> 
> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Hi Vishal,
> 
> I think you're right! And thanks for looking into this so deeply. 
> 
> With your last mail your basically saying, that the checkpoint could not be 
> restored because your HDFS was temporarily down. If Flink had not deleted 
> that checkpoint it might have been possible to restore it at a later point, 
> right?
> 
> Regarding failed checkpoints killing the job: yes, this is currently the 
> expected behaviour but there are plans to change this.
> 
> Best,
> Aljoscha
> 
>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> 
>> I think this is the offending piece. There is a catch all Exception, which 
>> IMHO should understand a recoverable exception from an unrecoverable on. 
>> 
>> 
>>                      try {
>>                              completedCheckpoint = 
>> retrieveCompletedCheckpoint(checkpointStateHandle);
>>                              if (completedCheckpoint != null) {
>>                                      
>> completedCheckpoints.add(completedCheckpoint);
>>                              }
>>                      } catch (Exception e) {
>>                              LOG.warn("Could not retrieve checkpoint. 
>> Removing it from the completed " +
>>                                      "checkpoint store.", e);
>> 
>>                              // remove the checkpoint with broken state 
>> handle
>>                              
>> removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0);
>>                      }
>> 
>> 
>> 
>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> So this is the issue and tell us that it is wrong. ZK had some state ( 
>> backed by hdfs )  that referred to a checkpoint ( the same exact last 
>> successful checkpoint that was successful before NN screwed us ). When the 
>> JM tried to recreate the state and b'coz NN was down failed to retrieve the 
>> CHK handle from hdfs  and conveniently ( and I think very  wrongly ) removed 
>> the CHK from being considered and cleaned the pointer ( though failed as was 
>> NN was down and is obvious from the dangling file in recovery ) . The 
>> metadata itself was on hdfs and failure in retrieving should have been a 
>> stop all, not going to trying doing magic exception rather than starting 
>> from a blank state.
>> 
>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint 44286 
>> from state handle under /0000000000000044286. This indicates that the 
>> retrieved state handle is broken. Try cleaning the state handle store.
>> 
>> 
>> 
>> 
>> 
>> 
>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> Also note that  the zookeeper recovery did  ( sadly on the same hdfs cluster 
>> ) also showed the same behavior. It had the pointers to the chk point  ( I  
>> think that is what it does, keeps metadata of where the checkpoint etc  ) .  
>> It too decided to keep the recovery file from the failed state.
>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55 
>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>> 
>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07 
>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>> 
>> This is getting a little interesting. What say you :)
>> 
>> 
>> 
>> 
>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> Another thing I noted was this thing
>> 
>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54 
>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>> 
>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15 
>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>> 
>> 
>> 
>> Generally what Flink does IMHO is that it replaces the chk point directory 
>> with a new one. I see it happening now. Every minute it replaces the old 
>> directory.  In this job's case however, it did not delete the 2017-10-04 
>> 13:54  and hence the chk-44286 directory.  This was the last chk-44286 (  I 
>> think  )  successfully created before NN had issues but as is usual did not 
>> delete this  chk-44286. It looks as if it started with a blank slate 
>> ???????? Does this strike a chord ?????
>> 
>> 
>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> Hello Fabian, 
>>                       First of all congratulations on this fabulous 
>> framework. I have worked with GDF and though GDF has some natural pluses 
>> Flink's state management is far more advanced. With kafka as a source it 
>> negates issues GDF has ( GDF integration with pub/sub is organic and that is 
>> to be expected but non FIFO pub/sub is an issue with windows on event time 
>> etc )
>> 
>>                    Coming back to this issue. We have that same kafka topic 
>> feeding a streaming druid datasource and we do not see any issue there, so 
>> so data loss on the source, kafka is not applicable. I am totally certain 
>> that the "retention" time was not an issue. It is 4 days of retention and we 
>> fixed this issue within 30 minutes. We could replay kafka with a new 
>> consumer group.id <http://group.id/> and that worked fine. 
>> 
>> 
>> Note these properties and see if they strike a chord.
>> 
>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is the 
>> default true. I bring this up to see whether flink will in any circumstance 
>> drive consumption on the kafka perceived offset rather than the one in the 
>> checkpoint.
>> 
>> * The state.backend.fs.memory-threshold: 0 has not been set.  The state is 
>> big enough though therefore IMHO no way the state is stored along with the 
>> meta data in JM ( or ZK ? ) . The reason I bring this up is to make sure 
>> when you say that the size has to be less than 1024bytes , you are talking 
>> about cumulative state of the pipeine.
>> 
>> * We have a good sense of SP ( save point )  and CP ( checkpoint ) and 
>> certainly understand that they actually are not dissimilar. However in this 
>> case there were multiple attempts to restart the pipe before it finally 
>> succeeded. 
>> 
>> * Other hdfs related poperties.
>>  
>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints <>/<%= 
>> flink_hdfs_root %>
>>  state.savepoints.dir: hdfs:///flink-savepoints <>/<%= flink_hdfs_root %>
>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery <>/<%= 
>> flink_hdfs_root %>
>> 
>> 
>> Do these make sense ? Is there anything else I should look at.  Please also 
>> note that it is the second time this has happened. The first time I was 
>> vacationing and was not privy to the state of the flink pipeline, but the 
>> net effect were similar. The counts for the first window after an internal 
>> restart dropped. 
>> 
>> 
>> 
>> 
>> Thank you for you patience and regards,
>> 
>> Vishal
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> Hi Vishal,
>> 
>> window operators are always stateful because the operator needs to remember 
>> previously received events (WindowFunction) or intermediate results 
>> (ReduceFunction).
>> Given the program you described, a checkpoint should include the Kafka 
>> consumer offset and the state of the window operator. If the program 
>> eventually successfully (i.e., without an error) recovered from the last 
>> checkpoint, all its state should have been restored. Since the last 
>> checkpoint was before HDFS went into safe mode, the program would have been 
>> reset to that point. If the Kafka retention time is less than the time it 
>> took to fix HDFS you would have lost data because it would have been removed 
>> from Kafka. If that's not the case, we need to investigate this further 
>> because a checkpoint recovery must not result in state loss.
>> 
>> Restoring from a savepoint is not so much different from automatic 
>> checkpoint recovery. Given that you have a completed savepoint, you can 
>> restart the job from that point. The main difference is that checkpoints are 
>> only used for internal recovery and usually discarded once the job is 
>> terminated while savepoints are retained. 
>> 
>> Regarding your question if a failed checkpoint should cause the job to fail 
>> and recover I'm not sure what the current status is.
>> Stefan (in CC) should know what happens if a checkpoint fails.
>> 
>> Best, Fabian
>> 
>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>>:
>> To add to it, my pipeline is a simple 
>> 
>> keyBy(0)
>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>         .reduce(new ReduceFunction(), new WindowFunction())
>> 
>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> Hello folks,
>> 
>> As far as I know checkpoint failure should be ignored and retried with 
>> potentially larger state. I had this situation
>> 
>> * hdfs went into a safe mode b'coz of Name Node issues
>> * exception was thrown
>> 
>>     
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>  Operation category WRITE is not supported in state standby. Visit 
>> https://s.apache.org/sbnn-error <https://s.apache.org/sbnn-error>
>>     ..................
>> 
>>     at 
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>         at 
>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
>>         at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
>> 
>> * The pipeline came back after a few restarts and checkpoint failures, after 
>> the hdfs issues were resolved.
>> 
>> I would not have worried about the restart, but it was evident that I lost 
>> my operator state. Either it was my kafka consumer that kept on advancing 
>> it's offset between a start and the next checkpoint failure ( a minute's 
>> worth ) or the the operator that had partial aggregates was lost. I have a 
>> 15 minute window of counts on a keyed operator
>> 
>> I am using ROCKS DB and of course have checkpointing turned on.
>> 
>> The questions thus are
>> 
>> * Should a pipeline be restarted if checkpoint fails ?
>> * Why on restart did the operator state did not recreate ?
>> * Is the nature of the exception thrown have to do with any of this b'coz 
>> suspend and resume from a save point work as expected ?
>> * And though I am pretty sure, are operators like the Window operator 
>> stateful by drfault and thus if I have timeWindow(Time.of(window_size, 
>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), the 
>> state is managed by flink ?
>> 
>> Thanks.
>> 
>> 
>> 
>> 
>> 
>> 
>> 
> 
> 
> 

Reply via email to