Did you see my second mail?
> On 24. Jan 2018, at 12:50, Vishal Santoshi <vishal.santo...@gmail.com> wrote:
>
> As in, if there are chk handles in zk, there should no reason to start a new
> job ( bad handle, no hdfs connectivity etc ),
> yes that sums it up.
>
> On Wed, Jan 24, 2018 at 5:35 AM, Aljoscha Krettek <aljos...@apache.org
> <mailto:aljos...@apache.org>> wrote:
> Wait a sec, I just checked out the code again and it seems we already do
> that:
> https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210
>
> <https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210>
>
> If there were some checkpoints but none could be read we fail recovery.
>
>
>> On 24. Jan 2018, at 11:32, Aljoscha Krettek <aljos...@apache.org
>> <mailto:aljos...@apache.org>> wrote:
>>
>> That sounds reasonable: We would keep the first fix, i.e. never delete
>> checkpoints if they're "corrupt", only when they're subsumed. Additionally,
>> we fail the job if there are some checkpoints in ZooKeeper but none of them
>> can be restored to prevent the case where a job starts from scratch even
>> though it shouldn't.
>>
>> Does that sum it up?
>>
>>> On 24. Jan 2018, at 01:19, Vishal Santoshi <vishal.santo...@gmail.com
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>>
>>> If we hit the retry limit, abort the job. In our case we will restart from
>>> the last SP ( we as any production pile do it is n time s a day ) and that
>>> I would think should be OK for most folks ?
>>>
>>> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi
>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>> Thank you for considering this. If I understand you correctly.
>>>
>>> * CHK pointer on ZK for a CHK state on hdfs was done successfully.
>>> * Some issue restarted the pipeline.
>>> * The NN was down unfortunately and flink could not retrieve the CHK state
>>> from the CHK pointer on ZK.
>>>
>>> Before
>>>
>>> * The CHK pointer was being removed and the job started from a brand new
>>> slate.
>>>
>>> After ( this fix on 1.4 +)
>>>
>>> * do not delete the CHK pointer ( It has to be subsumed to be deleted ).
>>> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit any
>>> retry limit ) to restore state
>>> * NN comes back
>>> * Flink restores state on the next retry.
>>>
>>> I would hope that is the sequence to follow.
>>>
>>> Regards.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljos...@apache.org
>>> <mailto:aljos...@apache.org>> wrote:
>>> Hi Vishal,
>>>
>>> I think you might be right. We fixed the problem that checkpoints where
>>> dropped via https://issues.apache.org/jira/browse/FLINK-7783
>>> <https://issues.apache.org/jira/browse/FLINK-7783>. However, we still have
>>> the problem that if the DFS is not up at all then it will look as if the
>>> job is starting from scratch. However, the alternative is failing the job,
>>> in which case you will also never be able to restore from a checkpoint.
>>> What do you think?
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>>> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com
>>>> <mailto:fhue...@gmail.com>> wrote:
>>>>
>>>> Sorry for the late reply.
>>>>
>>>> I created FLINK-8487 [1] to track this problem
>>>>
>>>> @Vishal, can you have a look and check if if forgot some details? I logged
>>>> the issue for Flink 1.3.2, is that correct?
>>>> Please add more information if you think it is relevant.
>>>>
>>>> Thanks,
>>>> Fabian
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-8487
>>>> <https://issues.apache.org/jira/browse/FLINK-8487>
>>>>
>>>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com
>>>> <mailto:vishal.santo...@gmail.com>>:
>>>> Or this one
>>>>
>>>> https://issues.apache.org/jira/browse/FLINK-4815
>>>> <https://issues.apache.org/jira/browse/FLINK-4815>
>>>>
>>>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi
>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>> ping.
>>>>
>>>> This happened again on production and it seems reasonable to abort
>>>> when a checkpoint is not found rather than behave as if it is a brand new
>>>> pipeline.
>>>>
>>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi
>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>> Folks sorry for being late on this. Can some body with the knowledge of
>>>> this code base create a jira issue for the above ? We have seen this more
>>>> than once on production.
>>>>
>>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org
>>>> <mailto:aljos...@apache.org>> wrote:
>>>> Hi Vishal,
>>>>
>>>> Some relevant Jira issues for you are:
>>>>
>>>> - https://issues.apache.org/jira/browse/FLINK-4808:
>>>> <https://issues.apache.org/jira/browse/FLINK-4808:> Allow skipping failed
>>>> checkpoints
>>>> - https://issues.apache.org/jira/browse/FLINK-4815:
>>>> <https://issues.apache.org/jira/browse/FLINK-4815:> Automatic fallback to
>>>> earlier checkpoint when checkpoint restore fails
>>>> - https://issues.apache.org/jira/browse/FLINK-7783:
>>>> <https://issues.apache.org/jira/browse/FLINK-7783:> Don't always remove
>>>> checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com
>>>>> <mailto:fhue...@gmail.com>> wrote:
>>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> it would be great if you could create a JIRA ticket with Blocker priority.
>>>>> Please add all relevant information of your detailed analysis, add a link
>>>>> to this email thread (see [1] for the web archive of the mailing list),
>>>>> and post the id of the JIRA issue here.
>>>>>
>>>>> Thanks for looking into this!
>>>>>
>>>>> Best regards,
>>>>> Fabian
>>>>>
>>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>>>> <https://lists.apache.org/list.html?user@flink.apache.org>
>>>>>
>>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com
>>>>> <mailto:vishal.santo...@gmail.com>>:
>>>>> Thank you for confirming.
>>>>>
>>>>>
>>>>> I think this is a critical bug. In essence any checkpoint store (
>>>>> hdfs/S3/File) will loose state if it is unavailable at resume. This
>>>>> becomes all the more painful with your confirming that "failed
>>>>> checkpoints killing the job" b'coz essentially it mean that if remote
>>>>> store in unavailable during checkpoint than you have lost state ( till
>>>>> of course you have a retry of none or an unbounded retry delay, a delay
>>>>> that you hope the store revives in ) .. Remember the first retry failure
>>>>> will cause new state according the code as written iff the remote store
>>>>> is down. We would rather have a configurable property that establishes
>>>>> our desire to abort something like a "abort_retry_on_chkretrevalfailure"
>>>>>
>>>>>
>>>>> In our case it is very important that we do not undercount a window, one
>>>>> reason we use flink and it's awesome failure guarantees, as various
>>>>> alarms sound ( we do anomaly detection on the time series ).
>>>>>
>>>>> Please create a jira ticket for us to follow or we could do it.
>>>>>
>>>>>
>>>>> PS Not aborting on checkpointing, till a configurable limit is very
>>>>> important too.
>>>>>
>>>>>
>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org
>>>>> <mailto:aljos...@apache.org>> wrote:
>>>>> Hi Vishal,
>>>>>
>>>>> I think you're right! And thanks for looking into this so deeply.
>>>>>
>>>>> With your last mail your basically saying, that the checkpoint could not
>>>>> be restored because your HDFS was temporarily down. If Flink had not
>>>>> deleted that checkpoint it might have been possible to restore it at a
>>>>> later point, right?
>>>>>
>>>>> Regarding failed checkpoints killing the job: yes, this is currently the
>>>>> expected behaviour but there are plans to change this.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com
>>>>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>>>>>
>>>>>> I think this is the offending piece. There is a catch all Exception,
>>>>>> which IMHO should understand a recoverable exception from an
>>>>>> unrecoverable on.
>>>>>>
>>>>>>
>>>>>> try {
>>>>>> completedCheckpoint =
>>>>>> retrieveCompletedCheckpoint(checkpointStateHandle);
>>>>>> if (completedCheckpoint != null) {
>>>>>>
>>>>>> completedCheckpoints.add(completedCheckpoint);
>>>>>> }
>>>>>> } catch (Exception e) {
>>>>>> LOG.warn("Could not retrieve checkpoint.
>>>>>> Removing it from the completed " +
>>>>>> "checkpoint store.", e);
>>>>>>
>>>>>> // remove the checkpoint with broken state
>>>>>> handle
>>>>>>
>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1,
>>>>>> checkpointStateHandle.f0);
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi
>>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>>> So this is the issue and tell us that it is wrong. ZK had some state (
>>>>>> backed by hdfs ) that referred to a checkpoint ( the same exact last
>>>>>> successful checkpoint that was successful before NN screwed us ). When
>>>>>> the JM tried to recreate the state and b'coz NN was down failed to
>>>>>> retrieve the CHK handle from hdfs and conveniently ( and I think very
>>>>>> wrongly ) removed the CHK from being considered and cleaned the pointer
>>>>>> ( though failed as was NN was down and is obvious from the dangling file
>>>>>> in recovery ) . The metadata itself was on hdfs and failure in
>>>>>> retrieving should have been a stop all, not going to trying doing magic
>>>>>> exception rather than starting from a blank state.
>>>>>>
>>>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint
>>>>>> 44286 from state handle under /0000000000000044286. This indicates that
>>>>>> the retrieved state handle is broken. Try cleaning the state handle
>>>>>> store.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi
>>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>>> Also note that the zookeeper recovery did ( sadly on the same hdfs
>>>>>> cluster ) also showed the same behavior. It had the pointers to the chk
>>>>>> point ( I think that is what it does, keeps metadata of where the
>>>>>> checkpoint etc ) . It too decided to keep the recovery file from the
>>>>>> failed state.
>>>>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55
>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>>
>>>>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07
>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>>
>>>>>> This is getting a little interesting. What say you :)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi
>>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>>> Another thing I noted was this thing
>>>>>>
>>>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54
>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>>>>>>
>>>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15
>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>>>>>>
>>>>>>
>>>>>>
>>>>>> Generally what Flink does IMHO is that it replaces the chk point
>>>>>> directory with a new one. I see it happening now. Every minute it
>>>>>> replaces the old directory. In this job's case however, it did not
>>>>>> delete the 2017-10-04 13:54 and hence the chk-44286 directory. This
>>>>>> was the last chk-44286 ( I think ) successfully created before NN had
>>>>>> issues but as is usual did not delete this chk-44286. It looks as if it
>>>>>> started with a blank slate ???????? Does this strike a chord ?????
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi
>>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>>> Hello Fabian,
>>>>>> First of all congratulations on this fabulous
>>>>>> framework. I have worked with GDF and though GDF has some natural pluses
>>>>>> Flink's state management is far more advanced. With kafka as a source it
>>>>>> negates issues GDF has ( GDF integration with pub/sub is organic and
>>>>>> that is to be expected but non FIFO pub/sub is an issue with windows on
>>>>>> event time etc )
>>>>>>
>>>>>> Coming back to this issue. We have that same kafka
>>>>>> topic feeding a streaming druid datasource and we do not see any issue
>>>>>> there, so so data loss on the source, kafka is not applicable. I am
>>>>>> totally certain that the "retention" time was not an issue. It is 4 days
>>>>>> of retention and we fixed this issue within 30 minutes. We could replay
>>>>>> kafka with a new consumer group.id <http://group.id/> and that worked
>>>>>> fine.
>>>>>>
>>>>>>
>>>>>> Note these properties and see if they strike a chord.
>>>>>>
>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is the
>>>>>> default true. I bring this up to see whether flink will in any
>>>>>> circumstance drive consumption on the kafka perceived offset rather than
>>>>>> the one in the checkpoint.
>>>>>>
>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set. The state
>>>>>> is big enough though therefore IMHO no way the state is stored along
>>>>>> with the meta data in JM ( or ZK ? ) . The reason I bring this up is to
>>>>>> make sure when you say that the size has to be less than 1024bytes , you
>>>>>> are talking about cumulative state of the pipeine.
>>>>>>
>>>>>> * We have a good sense of SP ( save point ) and CP ( checkpoint ) and
>>>>>> certainly understand that they actually are not dissimilar. However in
>>>>>> this case there were multiple attempts to restart the pipe before it
>>>>>> finally succeeded.
>>>>>>
>>>>>> * Other hdfs related poperties.
>>>>>>
>>>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints <>/<%=
>>>>>> flink_hdfs_root %>
>>>>>> state.savepoints.dir: hdfs:///flink-savepoints <>/<%= flink_hdfs_root %>
>>>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery <>/<%=
>>>>>> flink_hdfs_root %>
>>>>>>
>>>>>>
>>>>>> Do these make sense ? Is there anything else I should look at. Please
>>>>>> also note that it is the second time this has happened. The first time I
>>>>>> was vacationing and was not privy to the state of the flink pipeline,
>>>>>> but the net effect were similar. The counts for the first window after
>>>>>> an internal restart dropped.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thank you for you patience and regards,
>>>>>>
>>>>>> Vishal
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com
>>>>>> <mailto:fhue...@gmail.com>> wrote:
>>>>>> Hi Vishal,
>>>>>>
>>>>>> window operators are always stateful because the operator needs to
>>>>>> remember previously received events (WindowFunction) or intermediate
>>>>>> results (ReduceFunction).
>>>>>> Given the program you described, a checkpoint should include the Kafka
>>>>>> consumer offset and the state of the window operator. If the program
>>>>>> eventually successfully (i.e., without an error) recovered from the last
>>>>>> checkpoint, all its state should have been restored. Since the last
>>>>>> checkpoint was before HDFS went into safe mode, the program would have
>>>>>> been reset to that point. If the Kafka retention time is less than the
>>>>>> time it took to fix HDFS you would have lost data because it would have
>>>>>> been removed from Kafka. If that's not the case, we need to investigate
>>>>>> this further because a checkpoint recovery must not result in state loss.
>>>>>>
>>>>>> Restoring from a savepoint is not so much different from automatic
>>>>>> checkpoint recovery. Given that you have a completed savepoint, you can
>>>>>> restart the job from that point. The main difference is that checkpoints
>>>>>> are only used for internal recovery and usually discarded once the job
>>>>>> is terminated while savepoints are retained.
>>>>>>
>>>>>> Regarding your question if a failed checkpoint should cause the job to
>>>>>> fail and recover I'm not sure what the current status is.
>>>>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com
>>>>>> <mailto:vishal.santo...@gmail.com>>:
>>>>>> To add to it, my pipeline is a simple
>>>>>>
>>>>>> keyBy(0)
>>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>> .reduce(new ReduceFunction(), new WindowFunction())
>>>>>>
>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi
>>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>>> Hello folks,
>>>>>>
>>>>>> As far as I know checkpoint failure should be ignored and retried with
>>>>>> potentially larger state. I had this situation
>>>>>>
>>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>>> * exception was thrown
>>>>>>
>>>>>>
>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>>>> Operation category WRITE is not supported in state standby. Visit
>>>>>> https://s.apache.org/sbnn-error <https://s.apache.org/sbnn-error>
>>>>>> ..................
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>>>> at
>>>>>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
>>>>>>
>>>>>> * The pipeline came back after a few restarts and checkpoint failures,
>>>>>> after the hdfs issues were resolved.
>>>>>>
>>>>>> I would not have worried about the restart, but it was evident that I
>>>>>> lost my operator state. Either it was my kafka consumer that kept on
>>>>>> advancing it's offset between a start and the next checkpoint failure (
>>>>>> a minute's worth ) or the the operator that had partial aggregates was
>>>>>> lost. I have a 15 minute window of counts on a keyed operator
>>>>>>
>>>>>> I am using ROCKS DB and of course have checkpointing turned on.
>>>>>>
>>>>>> The questions thus are
>>>>>>
>>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>>> * Why on restart did the operator state did not recreate ?
>>>>>> * Is the nature of the exception thrown have to do with any of this
>>>>>> b'coz suspend and resume from a save point work as expected ?
>>>>>> * And though I am pretty sure, are operators like the Window operator
>>>>>> stateful by drfault and thus if I have timeWindow(Time.of(window_size,
>>>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()),
>>>>>> the state is managed by flink ?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>
>