To add to this, we are assuming that the default configuration will fail a
pipeline if  a checkpoint fails and will hit the recover loop only and only
if the retry limit is not reached




On Thu, Jan 25, 2018 at 7:00 AM, Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Sorry.
>
> There are 2 scenerios
>
>   * Idempotent Sinks Use Case where we would want to restore from the
> latest valid checkpoint.  If I understand the code correctly we try to
> retrieve all completed checkpoints  for all handles in ZK and abort ( throw
> an exception ) if there are handles but no corresponding complete
> checkpoints in hdfs,  else we use the latest valid checkpoint state.  On
> abort a restart  and thus restore of the  pipe  is issued repeating the
> above execution. If the failure in hdfs was transient a retry will succeed
> else when the  retry limit is reached the pipeline is aborted for good.
>
>
> * Non Idempotent Sinks where we have no retries. We do not want to recover
> from the last available checkpoint as the above code will do as the more
> into history we go the more duplicates will be delivered. The only solution
> is use exactly once semantics of the source and sinks if possible.
>
>
>
>
>
>
>
>
> On Wed, Jan 24, 2018 at 7:20 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Did you see my second mail?
>>
>>
>> On 24. Jan 2018, at 12:50, Vishal Santoshi <vishal.santo...@gmail.com>
>> wrote:
>>
>> As in, if there are chk handles in zk, there should no reason to start a
>> new job ( bad handle, no hdfs connectivity etc ),
>>  yes that sums it up.
>>
>> On Wed, Jan 24, 2018 at 5:35 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Wait a sec, I just checked out the code again and it seems we already do
>>> that: https://github.com/apache/flink/blob/9071e3befb8c279f7
>>> 3c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apac
>>> he/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210
>>>
>>> If there were some checkpoints but none could be read we fail recovery.
>>>
>>>
>>> On 24. Jan 2018, at 11:32, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>
>>> That sounds reasonable: We would keep the first fix, i.e. never delete
>>> checkpoints if they're "corrupt", only when they're subsumed. Additionally,
>>> we fail the job if there are some checkpoints in ZooKeeper but none of them
>>> can be restored to prevent the case where a job starts from scratch even
>>> though it shouldn't.
>>>
>>> Does that sum it up?
>>>
>>> On 24. Jan 2018, at 01:19, Vishal Santoshi <vishal.santo...@gmail.com>
>>> wrote:
>>>
>>> If we hit the retry limit, abort the job. In our case we will restart
>>> from the last SP ( we as any production pile do it is n time s a day )  and
>>> that I would think should be OK for most folks ?
>>>
>>> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> Thank you for considering this. If I understand you correctly.
>>>>
>>>> * CHK pointer on ZK for a CHK state on hdfs was done successfully.
>>>> * Some issue restarted the pipeline.
>>>> * The NN was down unfortunately and flink could not retrieve the  CHK
>>>> state from the CHK pointer on ZK.
>>>>
>>>> Before
>>>>
>>>> * The CHK pointer was being removed and the job started from a brand
>>>> new slate.
>>>>
>>>> After ( this fix on 1.4 +)
>>>>
>>>> * do not delete the CHK pointer ( It has to be subsumed to be deleted
>>>> ).
>>>> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit
>>>> any retry limit ) to restore state
>>>> * NN comes back
>>>> * Flink restores state on the next retry.
>>>>
>>>> I would hope that is the sequence to follow.
>>>>
>>>> Regards.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> I think you might be right. We fixed the problem that checkpoints
>>>>> where dropped via https://issues.apache.org/jira/browse/FLINK-7783.
>>>>> However, we still have the problem that if the DFS is not up at all then 
>>>>> it
>>>>> will look as if the job is starting from scratch. However, the alternative
>>>>> is failing the job, in which case you will also never be able to restore
>>>>> from a checkpoint. What do you think?
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>
>>>>> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>
>>>>> Sorry for the late reply.
>>>>>
>>>>> I created FLINK-8487 [1] to track this problem
>>>>>
>>>>> @Vishal, can you have a look and check if if forgot some details? I
>>>>> logged the issue for Flink 1.3.2, is that correct?
>>>>> Please add more information if you think it is relevant.
>>>>>
>>>>> Thanks,
>>>>> Fabian
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8487
>>>>>
>>>>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>
>>>>> :
>>>>>
>>>>>> Or this one
>>>>>>
>>>>>> https://issues.apache.org/jira/browse/FLINK-4815
>>>>>>
>>>>>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>
>>>>>>> ping.
>>>>>>>
>>>>>>>     This happened again on production and it seems reasonable to
>>>>>>> abort when a checkpoint is not found rather than behave as if it is a 
>>>>>>> brand
>>>>>>> new pipeline.
>>>>>>>
>>>>>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Folks sorry for being late on this. Can some body with the
>>>>>>>> knowledge of this code base create a jira issue for the above ? We have
>>>>>>>> seen this more than once on production.
>>>>>>>>
>>>>>>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <
>>>>>>>> aljos...@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi Vishal,
>>>>>>>>>
>>>>>>>>> Some relevant Jira issues for you are:
>>>>>>>>>
>>>>>>>>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow
>>>>>>>>> skipping failed checkpoints
>>>>>>>>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic
>>>>>>>>> fallback to earlier checkpoint when checkpoint restore fails
>>>>>>>>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always
>>>>>>>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi Vishal,
>>>>>>>>>
>>>>>>>>> it would be great if you could create a JIRA ticket with Blocker
>>>>>>>>> priority.
>>>>>>>>> Please add all relevant information of your detailed analysis, add
>>>>>>>>> a link to this email thread (see [1] for the web archive of the 
>>>>>>>>> mailing
>>>>>>>>> list), and post the id of the JIRA issue here.
>>>>>>>>>
>>>>>>>>> Thanks for looking into this!
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>> Fabian
>>>>>>>>>
>>>>>>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>>>>>>>>
>>>>>>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Thank you for confirming.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>  I think this is a critical bug. In essence any checkpoint store
>>>>>>>>>> ( hdfs/S3/File)  will loose state if it is unavailable at resume. 
>>>>>>>>>> This
>>>>>>>>>> becomes all the more painful with your confirming that  "failed
>>>>>>>>>> checkpoints killing the job"  b'coz essentially it mean that if 
>>>>>>>>>> remote
>>>>>>>>>> store in unavailable  during checkpoint than you have lost state ( 
>>>>>>>>>> till of
>>>>>>>>>> course you have a retry of none or an unbounded retry delay, a delay 
>>>>>>>>>> that
>>>>>>>>>> you *hope* the store revives in ) .. Remember  the first retry
>>>>>>>>>> failure  will cause new state according the code as written iff the 
>>>>>>>>>> remote
>>>>>>>>>> store is down. We would rather have a configurable property that
>>>>>>>>>> establishes  our desire to abort something like a
>>>>>>>>>> "abort_retry_on_chkretrevalfailure"
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> In our case it is very important that we do not undercount a
>>>>>>>>>> window, one reason we use flink and it's awesome failure guarantees, 
>>>>>>>>>> as
>>>>>>>>>> various alarms sound ( we do anomaly detection on the time series ).
>>>>>>>>>>
>>>>>>>>>> Please create a jira ticket for us to follow or we could do it.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> PS Not aborting on checkpointing, till a configurable limit is
>>>>>>>>>> very important too.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <
>>>>>>>>>> aljos...@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>
>>>>>>>>>>> I think you're right! And thanks for looking into this so
>>>>>>>>>>> deeply.
>>>>>>>>>>>
>>>>>>>>>>> With your last mail your basically saying, that the checkpoint
>>>>>>>>>>> could not be restored because your HDFS was temporarily down. If 
>>>>>>>>>>> Flink had
>>>>>>>>>>> not deleted that checkpoint it might have been possible to restore 
>>>>>>>>>>> it at a
>>>>>>>>>>> later point, right?
>>>>>>>>>>>
>>>>>>>>>>> Regarding failed checkpoints killing the job: yes, this is
>>>>>>>>>>> currently the expected behaviour but there are plans to change this.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Aljoscha
>>>>>>>>>>>
>>>>>>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <
>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> I think this is the offending piece. There is a catch all
>>>>>>>>>>> Exception, which IMHO should understand a recoverable exception 
>>>>>>>>>>> from an
>>>>>>>>>>> unrecoverable on.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> try {
>>>>>>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch
>>>>>>>>>>> eckpointStateHandle);
>>>>>>>>>>> if (completedCheckpoint != null) {
>>>>>>>>>>> completedCheckpoints.add(completedCheckpoint);
>>>>>>>>>>> }
>>>>>>>>>>> } catch (Exception e) {
>>>>>>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the
>>>>>>>>>>> completed " +
>>>>>>>>>>> "checkpoint store.", e);
>>>>>>>>>>> // remove the checkpoint with broken state handle
>>>>>>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1,
>>>>>>>>>>> checkpointStateHandle.f0);
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> So this is the issue and tell us that it is wrong. ZK had some
>>>>>>>>>>>> state ( backed by hdfs ) that referred to a checkpoint ( the same 
>>>>>>>>>>>> exact
>>>>>>>>>>>> last successful checkpoint that was successful before NN screwed 
>>>>>>>>>>>> us ). When
>>>>>>>>>>>> the JM tried to recreate the state and b'coz NN was down failed to 
>>>>>>>>>>>> retrieve
>>>>>>>>>>>> the CHK handle from hdfs and conveniently ( and I think very 
>>>>>>>>>>>> wrongly )
>>>>>>>>>>>> removed the CHK from being considered and cleaned the pointer ( 
>>>>>>>>>>>> though
>>>>>>>>>>>> failed as was NN was down and is obvious from the dangling file in 
>>>>>>>>>>>> recovery
>>>>>>>>>>>> ) . The metadata itself was on hdfs and failure in retrieving 
>>>>>>>>>>>> should have
>>>>>>>>>>>> been a stop all, not going to trying doing magic exception rather 
>>>>>>>>>>>> than
>>>>>>>>>>>> starting from a blank state.
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve
>>>>>>>>>>>> checkpoint 44286 from state handle under /0000000000000044286.
>>>>>>>>>>>> This indicates that the retrieved state handle is broken. Try 
>>>>>>>>>>>> cleaning the
>>>>>>>>>>>> state handle store.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Also note that  the zookeeper recovery did  ( sadly on the
>>>>>>>>>>>>> same hdfs cluster ) also showed the same behavior. It had the 
>>>>>>>>>>>>> pointers to
>>>>>>>>>>>>> the chk point  ( I  think that is what it does, keeps metadata of 
>>>>>>>>>>>>> where the
>>>>>>>>>>>>> checkpoint etc  ) .  It too decided to keep the recovery file 
>>>>>>>>>>>>> from the
>>>>>>>>>>>>> failed state.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>>>>>>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>>>>>>>>>
>>>>>>>>>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>>>>>>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is getting a little interesting. What say you :)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Another thing I noted was this thing
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
>>>>>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk
>>>>>>>>>>>>>> -44286
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
>>>>>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk
>>>>>>>>>>>>>> -45428
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Generally what Flink does IMHO is that it replaces the chk
>>>>>>>>>>>>>> point directory with a new one. I see it happening now. Every 
>>>>>>>>>>>>>> minute it
>>>>>>>>>>>>>> replaces the old directory.  In this job's case however, it did 
>>>>>>>>>>>>>> not delete
>>>>>>>>>>>>>> the 2017-10-04 13:54  and hence the chk-44286 directory.  This 
>>>>>>>>>>>>>> was the last
>>>>>>>>>>>>>> chk-44286 (  I think  )  successfully created before NN had 
>>>>>>>>>>>>>> issues but as
>>>>>>>>>>>>>> is usual did not delete this  chk-44286. It looks as if it 
>>>>>>>>>>>>>> started with a
>>>>>>>>>>>>>> blank slate ???????? Does this strike a chord ?????
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello Fabian,
>>>>>>>>>>>>>>>                       First of all congratulations on this
>>>>>>>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has 
>>>>>>>>>>>>>>> some natural
>>>>>>>>>>>>>>> pluses Flink's state management is far more advanced. With 
>>>>>>>>>>>>>>> kafka as a
>>>>>>>>>>>>>>> source it negates issues GDF has ( GDF integration with pub/sub 
>>>>>>>>>>>>>>> is organic
>>>>>>>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue 
>>>>>>>>>>>>>>> with windows on
>>>>>>>>>>>>>>> event time etc )
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>                    Coming back to this issue. We have that
>>>>>>>>>>>>>>> same kafka topic feeding a streaming druid datasource and we do 
>>>>>>>>>>>>>>> not see any
>>>>>>>>>>>>>>> issue there, so so data loss on the source, kafka is not 
>>>>>>>>>>>>>>> applicable. I am
>>>>>>>>>>>>>>> totally certain that the "retention" time was not an issue.
>>>>>>>>>>>>>>> It is 4 days of retention and we fixed this issue within 30 
>>>>>>>>>>>>>>> minutes. We
>>>>>>>>>>>>>>> could replay kafka with a new consumer group.id and that
>>>>>>>>>>>>>>> worked fine.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Note these properties and see if they strike a chord.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka
>>>>>>>>>>>>>>> consumers is the default true. I bring this up to see whether 
>>>>>>>>>>>>>>> flink will in
>>>>>>>>>>>>>>> any circumstance drive consumption on the kafka perceived 
>>>>>>>>>>>>>>> offset rather
>>>>>>>>>>>>>>> than the one in the checkpoint.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been
>>>>>>>>>>>>>>> set.  The state is big enough though therefore IMHO no way the 
>>>>>>>>>>>>>>> state is
>>>>>>>>>>>>>>> stored along with the meta data in JM ( or ZK ? ) . The reason 
>>>>>>>>>>>>>>> I bring this
>>>>>>>>>>>>>>> up is to make sure when you say that the size has to be less 
>>>>>>>>>>>>>>> than 1024bytes
>>>>>>>>>>>>>>> , you are talking about cumulative state of the pipeine.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> * We have a good sense of SP ( save point )  and CP (
>>>>>>>>>>>>>>> checkpoint ) and certainly understand that they actually are not
>>>>>>>>>>>>>>> dissimilar. However in this case there were multiple attempts 
>>>>>>>>>>>>>>> to restart
>>>>>>>>>>>>>>> the pipe before it finally succeeded.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> * Other hdfs related poperties.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>>>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%= 
>>>>>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= 
>>>>>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Do these make sense ? Is there anything else I should look
>>>>>>>>>>>>>>> at.  Please also note that it is the second time this has 
>>>>>>>>>>>>>>> happened. The
>>>>>>>>>>>>>>> first time I was vacationing and was not privy to the state of 
>>>>>>>>>>>>>>> the flink
>>>>>>>>>>>>>>> pipeline, but the net effect were similar. The counts for the 
>>>>>>>>>>>>>>> first window
>>>>>>>>>>>>>>> after an internal restart dropped.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thank you for you patience and regards,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Vishal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <
>>>>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> window operators are always stateful because the operator
>>>>>>>>>>>>>>>> needs to remember previously received events (WindowFunction) 
>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>> intermediate results (ReduceFunction).
>>>>>>>>>>>>>>>> Given the program you described, a checkpoint should
>>>>>>>>>>>>>>>> include the Kafka consumer offset and the state of the window 
>>>>>>>>>>>>>>>> operator. If
>>>>>>>>>>>>>>>> the program eventually successfully (i.e., without an error) 
>>>>>>>>>>>>>>>> recovered from
>>>>>>>>>>>>>>>> the last checkpoint, all its state should have been restored. 
>>>>>>>>>>>>>>>> Since the
>>>>>>>>>>>>>>>> last checkpoint was before HDFS went into safe mode, the 
>>>>>>>>>>>>>>>> program would have
>>>>>>>>>>>>>>>> been reset to that point. If the Kafka retention time is less 
>>>>>>>>>>>>>>>> than the time
>>>>>>>>>>>>>>>> it took to fix HDFS you would have lost data because it would 
>>>>>>>>>>>>>>>> have been
>>>>>>>>>>>>>>>> removed from Kafka. If that's not the case, we need to 
>>>>>>>>>>>>>>>> investigate this
>>>>>>>>>>>>>>>> further because a checkpoint recovery must not result in state 
>>>>>>>>>>>>>>>> loss.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Restoring from a savepoint is not so much different from
>>>>>>>>>>>>>>>> automatic checkpoint recovery. Given that you have a completed 
>>>>>>>>>>>>>>>> savepoint,
>>>>>>>>>>>>>>>> you can restart the job from that point. The main difference 
>>>>>>>>>>>>>>>> is that
>>>>>>>>>>>>>>>> checkpoints are only used for internal recovery and usually 
>>>>>>>>>>>>>>>> discarded once
>>>>>>>>>>>>>>>> the job is terminated while savepoints are retained.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regarding your question if a failed checkpoint should cause
>>>>>>>>>>>>>>>> the job to fail and recover I'm not sure what the current 
>>>>>>>>>>>>>>>> status is.
>>>>>>>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint
>>>>>>>>>>>>>>>> fails.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <
>>>>>>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> To add to it, my pipeline is a simple
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> keyBy(0)
>>>>>>>>>>>>>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>>>>>>>>>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>>>>>>>>>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <
>>>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> As far as I know checkpoint failure should be ignored and
>>>>>>>>>>>>>>>>>> retried with potentially larger state. I had this situation
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>>>>>>>>>>>>>>> * exception was thrown
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     org.apache.hadoop.ipc.RemoteException(org.apache.
>>>>>>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE
>>>>>>>>>>>>>>>>>> is not supported in state standby. Visit
>>>>>>>>>>>>>>>>>> https://s.apache.org/sbnn-error
>>>>>>>>>>>>>>>>>>     ..................
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     at org.apache.flink.runtime.fs.hd
>>>>>>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>>>>>>>>>>>>>>>>         at org.apache.flink.core.fs.Safet
>>>>>>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.
>>>>>>>>>>>>>>>>>> java:111)
>>>>>>>>>>>>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck
>>>>>>>>>>>>>>>>>> pointStreamFactory.java:132)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> * The pipeline came back after a few restarts and
>>>>>>>>>>>>>>>>>> checkpoint failures, after the hdfs issues were resolved.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I would not have worried about the restart, but it was
>>>>>>>>>>>>>>>>>> evident that I lost my operator state. Either it was my 
>>>>>>>>>>>>>>>>>> kafka consumer that
>>>>>>>>>>>>>>>>>> kept on advancing it's offset between a start and the next 
>>>>>>>>>>>>>>>>>> checkpoint
>>>>>>>>>>>>>>>>>> failure ( a minute's worth ) or the the operator that had 
>>>>>>>>>>>>>>>>>> partial
>>>>>>>>>>>>>>>>>> aggregates was lost. I have a 15 minute window of counts on 
>>>>>>>>>>>>>>>>>> a keyed operator
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing
>>>>>>>>>>>>>>>>>> turned on.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The questions thus are
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>>>>>>>>>>>>>>> * Why on restart did the operator state did not recreate ?
>>>>>>>>>>>>>>>>>> * Is the nature of the exception thrown have to do with
>>>>>>>>>>>>>>>>>> any of this b'coz suspend and resume from a save point work 
>>>>>>>>>>>>>>>>>> as expected ?
>>>>>>>>>>>>>>>>>> * And though I am pretty sure, are operators like the
>>>>>>>>>>>>>>>>>> Window operator stateful by drfault and thus if I have
>>>>>>>>>>>>>>>>>> timeWindow(Time.of(window_size,
>>>>>>>>>>>>>>>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new 
>>>>>>>>>>>>>>>>>> WindowFunction()), the
>>>>>>>>>>>>>>>>>> state is managed by flink ?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>
>>
>

Reply via email to