Hi Saurabh and Yun Tang,

I tend to agree with Yun Tang. Exposure of stop-with-checkpoint would
complicate the system for most of the users a bit too much, with very
little gain.

> 1. In producing a full snapshot, I see this is noted as follows in the
Flip

If you want to recover in CLAIM or NO_CLAIM mode, that's an orthogonal
question to whether you ended up with a savepoint or checkpoint at the end
of your last job. So that's not an issue here. As Yun Tang wrote, the only
difference is that stop-with-savepoint (vs a hypothetical
stop-with-checkpoint), has to copy out files from previous incremental
checkpoints that are still being referenced. I don't expect this to be a
big issue for the majority of use cases (only if you have a really huge
state, and the async phase of the checkpoint/savepoint is taking a
very long time - and only when assuming that you don't have another issue
that's causing slow file upload) . And if that's an issue, you still have
the other work-around for your initial problem: to manually trigger all of
the checkpoints as you wish via the REST API.

Best,
Piotrek





pon., 24 paź 2022 o 10:13 Yun Tang <[email protected]> napisał(a):

> Hi Saurabh,
>
> From the scope of implementation, I think stopping with native savepoint
> is very close to stopping with checkpoint. The only different part is the
> fast duplication over distributed file systems, which could be mitigated
> via distributed file system shallow copy. Thus, I don't think we should
> introduce another concept called stopping with checkpoint to make the total
> system more complex.
>
>
> Best
> Yun Tang
> ________________________________
> From: Saurabh Kaul <[email protected]>
> Sent: Saturday, October 22, 2022 2:45
> To: [email protected] <[email protected]>
> Subject: Re: [DISCUSS] REST API to suspend & resume checkpointing
>
> Hi Piotr,
>
> Sorry for the confusion. I'm referring to latency concerns at 2 points -
>
> 1. In producing a full snapshot, I see this is noted as follows in the Flip
> [1]:
> > This means that if the underlying FileSystem doesn’t support fast
> duplication, incremental savepoints will most likely be still slower
> compared to incremental checkpoints.
> In our use case for example, we use S3 file system so we have some
> additional overhead of duplication.
> 2. I mixed up the savepoint recovery modes, I think the default NO_CLAIM
> mode would be more appropriate for production use generally since CLAIM
> mode carries the risk of accidental savepoint deletion when future
> checkpoints can build on top of it. Manual checkpoint deletion is not a
> concern. NO_CLAIM would require the first successful checkpoint to be a
> full checkpoint.
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-203
>
> On Fri, Oct 21, 2022 at 1:31 AM Piotr Nowojski <[email protected]>
> wrote:
>
> > Hi Saurabh,
> >
> > > 1. Stop with native savepoint does solve any races and produces a
> > > predictable restoration point, but producing a self-contained snapshot
> > and
> > > using CLAIM mode in re-running is not necessary here and adds latency.
> >
> > Why do you think it adds latency? CLAIM mode is actually the fastest one
> > with zero additional overheads. Native savepoints are almost an
> equivalent
> > of checkpoints.
> >
> > Best,
> > Piotrek
> >
> >
> > pt., 21 paź 2022 o 00:51 Saurabh Kaul <[email protected]> napisał(a):
> >
> > > Hi, thanks for the quick responses,
> > >
> > > I think a stop-with-checkpoint idea is overlapping well with the
> > > requirements.
> > >
> > > 1. Stop with native savepoint does solve any races and produces a
> > > predictable restoration point, but producing a self-contained snapshot
> > and
> > > using CLAIM mode in re-running is not necessary here and adds latency.
> > > Stop-with-checkpoint doesn't have these issues. It adds some downtime
> in
> > > waiting for a checkpoint to be completed but reduces replay time in the
> > new
> > > cluster which is a good trade-off. Since in this scenario of job
> > migration
> > > the job and/or job configuration is not changing; it should ideally be
> as
> > > fast as a regular failover scenario (like a TM going down).
> > > 2. Taking complete ownership of triggering checkpoints and making them
> > more
> > > configurable could be feasible but are less effective comparatively in
> > > terms of stopping the job for the primary purpose of low-downtime
> > migration
> > > of the job. Stop-with-checkpoint solves it more directly.
> > >
> > > Looking forward to hearing thoughts on this.
> > >
> > > On Thu, Oct 20, 2022 at 3:31 AM Piotr Nowojski <[email protected]>
> > > wrote:
> > >
> > > > Hi Saurabh,
> > > >
> > > > Thanks for reaching out with the proposal. I have some mixed feelings
> > > about
> > > > this for a couple of reasons:
> > > >
> > > > 1. It sounds like the core problem that you are describing is the
> race
> > > > condition between shutting down the cluster and completion of new
> > > > checkpoints. My first thought would be as Jing's, why don't you use
> > > > stop-with-savepoint? Especially the native savepoint? You can recover
> > > from
> > > > it using --claim mode, so the whole process should be quite fast
> > > actually.
> > > > 2. The same issue, not knowing the latest completed checkpoint id,
> > > plagued
> > > > us with some internal tests for quite a bit, so maybe this would also
> > be
> > > > worth considering to address instead? Like leaving in some text file
> > the
> > > > last completed checkpoint id? Or providing a way to read this from
> some
> > > > existing metadata files? However in our tests we actually
> fixed/worked
> > > > around that with manually triggering of checkpoints. The predecessor
> of
> > > > FLINK-27101 [1], FLINK-24280 [2], was implemented to address this
> exact
> > > > issue.  Which brings me to...
> > > > 3. You could actually just use the REST API to trigger all
> checkpoints
> > > > manually. The idea behind FLINK-27101 [1] was to add full flexibility
> > to
> > > > the users, without adding much complexity to the system. If we start
> > > adding
> > > > more REST calls to control checkpointing behaviour it would
> complicate
> > > the
> > > > system.
> > > > 4. If at all, I would think more towards a more generic idea of
> > > dynamically
> > > > reconfiguring the system. We could provide a generic way to
> dynamically
> > > > change configuration options. We wouldn't be able to support all
> > > > configurations, and furthermore, each "dynamic" option would have to
> be
> > > > handled/passed down to and through the system differently, BUT we
> > > wouldn't
> > > > have to do all of that at once. We could start with a very limited
> set
> > of
> > > > dynamic options, for example just with the checkpointing interval.
> This
> > > > must have been considered/discussed before, so I might be missing
> lots
> > of
> > > > things.
> > > > 5. Another direction, if 1. is not an option for some reason, is to
> > > provide
> > > > a stop-with-checkpoint feature?
> > > >
> > > > Best Piotrek
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > > > [2] https://issues.apache.org/jira/browse/FLINK-24280
> > > >
> > > > czw., 20 paź 2022 o 11:53 Jing Ge <[email protected]> napisał(a):
> > > >
> > > > > Hi Saurabh,
> > > > >
> > > > > In general, it is always good to add new features. I am not really
> > sure
> > > > if
> > > > > I understood your requirement. I guess it will be too long for you
> to
> > > > > resume the job with a created savepoint in the new stand-by Flink
> > > > cluster.
> > > > > But if it would be acceptable to you, you should not have the issue
> > you
> > > > > mentioned with the checkpoint. Speaking of checkpoint, if the
> > > checkpoint
> > > > > interval were set properly, it should be fine even if in some rare
> > > cases
> > > > > the last checkpoint was partially completed and is not selected.
> > > Another
> > > > > option could be to trigger a manual checkpoint and then use that
> one
> > to
> > > > > resume the job to maintain the low downtime.
> > > > >
> > > > > Best regards,
> > > > > JIng
> > > > >
> > > > > On Thu, Oct 20, 2022 at 7:53 AM Saurabh Kaul <[email protected]>
> > > wrote:
> > > > >
> > > > > > Hey everyone,
> > > > > >
> > > > > > I will create a FLIP, but wanted to gauge community opinion
> first.
> > > The
> > > > > > motivation is that production Flink applications frequently need
> to
> > > go
> > > > > > through node/image patching to update the software and AMI with
> > > latest
> > > > > > security fixes. These patching related restarts do not involve
> > > > > application
> > > > > > jar or parallelism updates and can therefore be done without
> costly
> > > > > > savepoint completion and restore cycles by relying on the last
> > > > checkpoint
> > > > > > state in order to achieve minimum downtime. In order to achieve
> > this,
> > > > we
> > > > > > currently rely on retained checkpoints and the following steps:
> > > > > >
> > > > > >    - Create new stand-by Flink cluster and submit application jar
> > > > > >    - Delete Flink TM deployment to stop processing & checkpoints
> on
> > > old
> > > > > >    cluster(reduce duplicates)
> > > > > >    - Query last completed checkpoint from REST API on JM of old
> > > cluster
> > > > > >    - Submit new job using last available checkpoint in new
> cluster,
> > > > > delete
> > > > > >    old cluster
> > > > > >
> > > > > > We have observed that this process will sometimes not select the
> > > latest
> > > > > > checkpoint as partially completed checkpoints race and finish
> after
> > > > > > querying the JM. Alternatives are to rely on creating other
> sources
> > > for
> > > > > > checkpoint info but this has complications, as discussed in [2].
> > > > Waiting
> > > > > > and force deleting task managers increases downtimes and doesn't
> > > > > guarantee
> > > > > > TM process termination respectively. In order to maintain low
> > > downtime,
> > > > > > duplicates and solve this race we can introduce an API to suspend
> > > > > > checkpointing. Querying the latest available checkpoint after
> > having
> > > > > > suspending checkpointing will guarantee that we can maintain
> > exactly
> > > > once
> > > > > > in such a scenario.
> > > > > >
> > > > > > This also acts as an extension to [1] where the feature to
> trigger
> > > > > > checkpoints through a control plane has been discussed and added.
> > It
> > > > > makes
> > > > > > the checkpointing process flexible and gives the user more
> control
> > in
> > > > > > scenarios like migrating applications and letting data processing
> > > catch
> > > > > up
> > > > > > temporarily.
> > > > > > We can implement this similar to [1] and expose a trigger to
> > suspend
> > > > and
> > > > > > resume checkpointing via CLI and REST API. We can add a parameter
> > to
> > > > > > suspend in 2 ways.
> > > > > >
> > > > > >    1. Suspend scheduled checkpoint trigger, doesn’t cancel any
> > still
> > > in
> > > > > >    progress checkpoints/savepoints but stops only future ones
> > > > > >    2. Suspend checkpoint coordinator, cancels in progress
> > > > > >    checkpoints/savepoints. Guarantees no racing checkpoint
> > completion
> > > > and
> > > > > >    could be used for canceling stuck checkpoints and help data
> > > > processing
> > > > > >
> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > > > > > [2] https://issues.apache.org/jira/browse/FLINK-26916
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to