Folks sorry for being late on this. Can some body with the knowledge of
this code base create a jira issue for the above ? We have seen this more
than once on production.

On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Vishal,
>
> Some relevant Jira issues for you are:
>
>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
> failed checkpoints
>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic fallback
> to earlier checkpoint when checkpoint restore fails
>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always remove
> checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>
> Best,
> Aljoscha
>
>
> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Vishal,
>
> it would be great if you could create a JIRA ticket with Blocker priority.
> Please add all relevant information of your detailed analysis, add a link
> to this email thread (see [1] for the web archive of the mailing list), and
> post the id of the JIRA issue here.
>
> Thanks for looking into this!
>
> Best regards,
> Fabian
>
> [1] https://lists.apache.org/list.html?user@flink.apache.org
>
> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>
>> Thank you for confirming.
>>
>>
>>  I think this is a critical bug. In essence any checkpoint store (
>> hdfs/S3/File)  will loose state if it is unavailable at resume. This
>> becomes all the more painful with your confirming that  "failed
>> checkpoints killing the job"  b'coz essentially it mean that if remote
>> store in unavailable  during checkpoint than you have lost state ( till of
>> course you have a retry of none or an unbounded retry delay, a delay that
>> you *hope* the store revives in ) .. Remember  the first retry failure
>>  will cause new state according the code as written iff the remote store is
>> down. We would rather have a configurable property that establishes  our
>> desire to abort something like a "abort_retry_on_chkretrevalfailure"
>>
>>
>> In our case it is very important that we do not undercount a window, one
>> reason we use flink and it's awesome failure guarantees, as various alarms
>> sound ( we do anomaly detection on the time series ).
>>
>> Please create a jira ticket for us to follow or we could do it.
>>
>>
>> PS Not aborting on checkpointing, till a configurable limit is very
>> important too.
>>
>>
>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> I think you're right! And thanks for looking into this so deeply.
>>>
>>> With your last mail your basically saying, that the checkpoint could not
>>> be restored because your HDFS was temporarily down. If Flink had not
>>> deleted that checkpoint it might have been possible to restore it at a
>>> later point, right?
>>>
>>> Regarding failed checkpoints killing the job: yes, this is currently the
>>> expected behaviour but there are plans to change this.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com>
>>> wrote:
>>>
>>> I think this is the offending piece. There is a catch all Exception,
>>> which IMHO should understand a recoverable exception from an unrecoverable
>>> on.
>>>
>>>
>>> try {
>>> completedCheckpoint = retrieveCompletedCheckpoint(ch
>>> eckpointStateHandle);
>>> if (completedCheckpoint != null) {
>>> completedCheckpoints.add(completedCheckpoint);
>>> }
>>> } catch (Exception e) {
>>> LOG.warn("Could not retrieve checkpoint. Removing it from the completed
>>> " +
>>> "checkpoint store.", e);
>>> // remove the checkpoint with broken state handle
>>> removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.
>>> f0);
>>> }
>>>
>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> So this is the issue and tell us that it is wrong. ZK had some state (
>>>> backed by hdfs ) that referred to a checkpoint ( the same exact last
>>>> successful checkpoint that was successful before NN screwed us ). When the
>>>> JM tried to recreate the state and b'coz NN was down failed to retrieve the
>>>> CHK handle from hdfs and conveniently ( and I think very wrongly ) removed
>>>> the CHK from being considered and cleaned the pointer ( though failed as
>>>> was NN was down and is obvious from the dangling file in recovery ) . The
>>>> metadata itself was on hdfs and failure in retrieving should have been a
>>>> stop all, not going to trying doing magic exception rather than starting
>>>> from a blank state.
>>>>
>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint
>>>> 44286 from state handle under /0000000000000044286. This indicates that the
>>>> retrieved state handle is broken. Try cleaning the state handle store.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Also note that  the zookeeper recovery did  ( sadly on the same hdfs
>>>>> cluster ) also showed the same behavior. It had the pointers to the chk
>>>>> point  ( I  think that is what it does, keeps metadata of where the
>>>>> checkpoint etc  ) .  It too decided to keep the recovery file from the
>>>>> failed state.
>>>>>
>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>
>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>
>>>>> This is getting a little interesting. What say you :)
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> Another thing I noted was this thing
>>>>>>
>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>>>>>>
>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>>>>>>
>>>>>>
>>>>>> Generally what Flink does IMHO is that it replaces the chk point
>>>>>> directory with a new one. I see it happening now. Every minute it 
>>>>>> replaces
>>>>>> the old directory.  In this job's case however, it did not delete the
>>>>>> 2017-10-04 13:54  and hence the chk-44286 directory.  This was the last
>>>>>> chk-44286 (  I think  )  successfully created before NN had issues but as
>>>>>> is usual did not delete this  chk-44286. It looks as if it started with a
>>>>>> blank slate ???????? Does this strike a chord ?????
>>>>>>
>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello Fabian,
>>>>>>>                       First of all congratulations on this fabulous
>>>>>>> framework. I have worked with GDF and though GDF has some natural pluses
>>>>>>> Flink's state management is far more advanced. With kafka as a source it
>>>>>>> negates issues GDF has ( GDF integration with pub/sub is organic and 
>>>>>>> that
>>>>>>> is to be expected but non FIFO pub/sub is an issue with windows on event
>>>>>>> time etc )
>>>>>>>
>>>>>>>                    Coming back to this issue. We have that same
>>>>>>> kafka topic feeding a streaming druid datasource and we do not see any
>>>>>>> issue there, so so data loss on the source, kafka is not applicable. I 
>>>>>>> am
>>>>>>> totally certain that the "retention" time was not an issue. It is 4
>>>>>>> days of retention and we fixed this issue within 30 minutes. We could
>>>>>>> replay kafka with a new consumer group.id and that worked fine.
>>>>>>>
>>>>>>>
>>>>>>> Note these properties and see if they strike a chord.
>>>>>>>
>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is
>>>>>>> the default true. I bring this up to see whether flink will in any
>>>>>>> circumstance drive consumption on the kafka perceived offset rather than
>>>>>>> the one in the checkpoint.
>>>>>>>
>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set.  The
>>>>>>> state is big enough though therefore IMHO no way the state is stored 
>>>>>>> along
>>>>>>> with the meta data in JM ( or ZK ? ) . The reason I bring this up is to
>>>>>>> make sure when you say that the size has to be less than 1024bytes , you
>>>>>>> are talking about cumulative state of the pipeine.
>>>>>>>
>>>>>>> * We have a good sense of SP ( save point )  and CP ( checkpoint )
>>>>>>> and certainly understand that they actually are not dissimilar. However 
>>>>>>> in
>>>>>>> this case there were multiple attempts to restart the pipe before it
>>>>>>> finally succeeded.
>>>>>>>
>>>>>>> * Other hdfs related poperties.
>>>>>>>
>>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>>>>> flink_hdfs_root %>
>>>>>>>
>>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %>
>>>>>>>
>>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= 
>>>>>>> flink_hdfs_root %>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Do these make sense ? Is there anything else I should look at.
>>>>>>> Please also note that it is the second time this has happened. The first
>>>>>>> time I was vacationing and was not privy to the state of the flink
>>>>>>> pipeline, but the net effect were similar. The counts for the first 
>>>>>>> window
>>>>>>> after an internal restart dropped.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thank you for you patience and regards,
>>>>>>>
>>>>>>> Vishal
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Vishal,
>>>>>>>>
>>>>>>>> window operators are always stateful because the operator needs to
>>>>>>>> remember previously received events (WindowFunction) or intermediate
>>>>>>>> results (ReduceFunction).
>>>>>>>> Given the program you described, a checkpoint should include the
>>>>>>>> Kafka consumer offset and the state of the window operator. If the 
>>>>>>>> program
>>>>>>>> eventually successfully (i.e., without an error) recovered from the 
>>>>>>>> last
>>>>>>>> checkpoint, all its state should have been restored. Since the last
>>>>>>>> checkpoint was before HDFS went into safe mode, the program would have 
>>>>>>>> been
>>>>>>>> reset to that point. If the Kafka retention time is less than the time 
>>>>>>>> it
>>>>>>>> took to fix HDFS you would have lost data because it would have been
>>>>>>>> removed from Kafka. If that's not the case, we need to investigate this
>>>>>>>> further because a checkpoint recovery must not result in state loss.
>>>>>>>>
>>>>>>>> Restoring from a savepoint is not so much different from automatic
>>>>>>>> checkpoint recovery. Given that you have a completed savepoint, you can
>>>>>>>> restart the job from that point. The main difference is that 
>>>>>>>> checkpoints
>>>>>>>> are only used for internal recovery and usually discarded once the job 
>>>>>>>> is
>>>>>>>> terminated while savepoints are retained.
>>>>>>>>
>>>>>>>> Regarding your question if a failed checkpoint should cause the job
>>>>>>>> to fail and recover I'm not sure what the current status is.
>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>
>>>>>>>>> To add to it, my pipeline is a simple
>>>>>>>>>
>>>>>>>>> keyBy(0)
>>>>>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello folks,
>>>>>>>>>>
>>>>>>>>>> As far as I know checkpoint failure should be ignored and retried
>>>>>>>>>> with potentially larger state. I had this situation
>>>>>>>>>>
>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>>>>>>> * exception was thrown
>>>>>>>>>>
>>>>>>>>>>     
>>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>>>>>>>> Operation category WRITE is not supported in state standby. Visit
>>>>>>>>>> https://s.apache.org/sbnn-error
>>>>>>>>>>     ..................
>>>>>>>>>>
>>>>>>>>>>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(
>>>>>>>>>> HadoopFileSystem.java:453)
>>>>>>>>>>         at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.
>>>>>>>>>> mkdirs(SafetyNetWrapperFileSystem.java:111)
>>>>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck
>>>>>>>>>> pointStreamFactory.java:132)
>>>>>>>>>>
>>>>>>>>>> * The pipeline came back after a few restarts and checkpoint
>>>>>>>>>> failures, after the hdfs issues were resolved.
>>>>>>>>>>
>>>>>>>>>> I would not have worried about the restart, but it was evident
>>>>>>>>>> that I lost my operator state. Either it was my kafka consumer that 
>>>>>>>>>> kept on
>>>>>>>>>> advancing it's offset between a start and the next checkpoint 
>>>>>>>>>> failure ( a
>>>>>>>>>> minute's worth ) or the the operator that had partial aggregates was 
>>>>>>>>>> lost.
>>>>>>>>>> I have a 15 minute window of counts on a keyed operator
>>>>>>>>>>
>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned on.
>>>>>>>>>>
>>>>>>>>>> The questions thus are
>>>>>>>>>>
>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>>>>>>> * Why on restart did the operator state did not recreate ?
>>>>>>>>>> * Is the nature of the exception thrown have to do with any of
>>>>>>>>>> this b'coz suspend and resume from a save point work as expected ?
>>>>>>>>>> * And though I am pretty sure, are operators like the Window
>>>>>>>>>> operator stateful by drfault and thus if I have 
>>>>>>>>>> timeWindow(Time.of(window_
>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new
>>>>>>>>>> WindowFunction()), the state is managed by flink ?
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>
>

Reply via email to