Hi Piotr,

Thanks again for proposing the isProcessingBacklog concept.

After discussing with Becket Qin and thinking about this more, I agree it
is a better idea to add a top-level concept to all source operators to
address the target use-case.

The main reason that changed my mind is that isProcessingBacklog can be
described as an inherent/nature attribute of every source instance and its
semantics does not need to depend on any specific checkpointing policy.
Also, we can hardcode the isProcessingBacklog behavior for the sources we
have considered so far (e.g. HybridSource and MySQL CDC source) without
asking users to explicitly configure the per-source behavior, which indeed
provides better user experience.

I have updated the FLIP based on the latest suggestions. The latest FLIP no
longer introduces per-source config that can be used by end-users. While I
agree with you that CheckpointTrigger can be a useful feature to address
additional use-cases, I am not sure it is necessary for the use-case
targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
in another FLIP?

Can you help take another look at the updated FLIP?

Best,
Dong



On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <pnowoj...@apache.org>
wrote:

> Hi Dong,
>
> > Suppose there are 1000 subtask and each subtask has 1% chance of being
> > "backpressured" at a given time (due to random traffic spikes). Then at
> any
> > given time, the chance of the job
> > being considered not-backpressured = (1-0.01)^1000. Since we evaluate the
> > backpressure metric once a second, the estimated time for the job
> > to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = 23163
> > sec = 6.4 hours.
> >
> > This means that the job will effectively always use the longer
> > checkpointing interval. It looks like a real concern, right?
>
> Sorry I don't understand where you are getting those numbers from.
> Instead of trying to find loophole after loophole, could you try to think
> how a given loophole could be improved/solved?
>
> > Hmm... I honestly think it will be useful to know the APIs due to the
> > following reasons.
>
> Please propose something. I don't think it's needed.
>
> > - For the use-case mentioned in FLIP-309 motivation section, would the
> APIs
> > of this alternative approach be more or less usable?
>
> Everything that you originally wanted to achieve in FLIP-309, you could do
> as well in my proposal.
> Vide my many mentions of the "hacky solution".
>
> > - Can these APIs reliably address the extra use-case (e.g. allow
> > checkpointing interval to change dynamically even during the unbounded
> > phase) as it claims?
>
> I don't see why not.
>
> > - Can these APIs be decoupled from the APIs currently proposed in
> FLIP-309?
>
> Yes
>
> > For example, if the APIs of this alternative approach can be decoupled
> from
> > the APIs currently proposed in FLIP-309, then it might be reasonable to
> > work on this extra use-case with a more advanced/complicated design
> > separately in a followup work.
>
> As I voiced my concerns previously, the current design of FLIP-309 would
> clog the public API and in the long run confuse the users. IMO It's
> addressing the
> problem in the wrong place.
>
> > Hmm.. do you mean we can do the following:
> > - Have all source operators emit a metric named "processingBacklog".
> > - Add a job-level config that specifies "the checkpointing interval to be
> > used when any source is processing backlog".
> > - The JM collects the "processingBacklog" periodically from all source
> > operators and uses the newly added config value as appropriate.
>
> Yes.
>
> > The challenge with this approach is that we need to define the semantics
> of
> > this "processingBacklog" metric and have all source operators
> > implement this metric. I am not sure we are able to do this yet without
> > having users explicitly provide this information on a per-source basis.
> >
> > Suppose the job read from a bounded Kafka source, should it emit
> > "processingBacklog=true"? If yes, then the job might use long
> checkpointing
> > interval even
> > if the job is asked to process data starting from now to the next 1 hour.
> > If no, then the job might use the short checkpointing interval
> > even if the job is asked to re-process data starting from 7 days ago.
>
> Yes. The same can be said of your proposal. Your proposal has the very same
> issues
> that every source would have to implement it differently, most sources
> would
> have no idea how to properly calculate the new requested checkpoint
> interval,
> for those that do know how to do that, user would have to configure every
> source
> individually and yet again we would end up with a system, that works only
> partially in
> some special use cases (HybridSource), that's confusing the users even
> more.
>
> That's why I think the more generic solution, working primarily on the same
> metrics that are used by various auto scaling solutions (like Flink K8s
> operator's
> autosaler) would be better. The hacky solution I proposed to:
> 1. show you that the generic solution is simply a superset of your proposal
> 2. if you are adamant that busyness/backpressured/records processing
> rate/pending records
>     metrics wouldn't cover your use case sufficiently (imo they can), then
> you can very easily
>     enhance this algorithm with using some hints from the sources. Like
> "processingBacklog==true"
>     to short circuit the main algorithm, if `processingBacklog` is
> available.
>
> Best,
> Piotrek
>
>
> pt., 16 cze 2023 o 04:45 Dong Lin <lindon...@gmail.com> napisał(a):
>
> > Hi again Piotr,
> >
> > Thank you for the reply. Please see my reply inline.
> >
> > On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski <
> piotr.nowoj...@gmail.com>
> > wrote:
> >
> > > Hi again Dong,
> > >
> > > > I understand that JM will get the backpressure-related metrics every
> > time
> > > > the RestServerEndpoint receives the REST request to get these
> metrics.
> > > But
> > > > I am not sure if RestServerEndpoint is already always receiving the
> > REST
> > > > metrics at regular interval (suppose there is no human manually
> > > > opening/clicking the Flink Web UI). And if it does, what is the
> > interval?
> > >
> > > Good catch, I've thought that metrics are pre-emptively sent to JM
> every
> > 10
> > > seconds.
> > > Indeed that's not the case at the moment, and that would have to be
> > > improved.
> > >
> > > > I would be surprised if Flink is already paying this much overhead
> just
> > > for
> > > > metrics monitoring. That is the main reason I still doubt it is true.
> > Can
> > > > you show where this 100 ms is currently configured?
> > > >
> > > > Alternatively, maybe you mean that we should add extra code to invoke
> > the
> > > > REST API at 100 ms interval. Then that means we need to considerably
> > > > increase the network/cpu overhead at JM, where the overhead will
> > increase
> > > > as the number of TM/slots increase, which may pose risk to the
> > > scalability
> > > > of the proposed design. I am not sure we should do this. What do you
> > > think?
> > >
> > > Sorry. I didn't mean metric should be reported every 100ms. I meant
> that
> > > "backPressuredTimeMsPerSecond (metric) would report (a value of)
> > 100ms/s."
> > > once per metric interval (10s?).
> > >
> >
> > Suppose there are 1000 subtask and each subtask has 1% chance of being
> > "backpressured" at a given time (due to random traffic spikes). Then at
> any
> > given time, the chance of the job
> > being considered not-backpressured = (1-0.01)^1000. Since we evaluate the
> > backpressure metric once a second, the estimated time for the job
> > to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = 23163
> > sec = 6.4 hours.
> >
> > This means that the job will effectively always use the longer
> > checkpointing interval. It looks like a real concern, right?
> >
> >
> > > > - What is the interface of this CheckpointTrigger? For example, are
> we
> > > > going to give CheckpointTrigger a context that it can use to fetch
> > > > arbitrary metric values? This can help us understand what information
> > > this
> > > > user-defined CheckpointTrigger can use to make the checkpoint
> decision.
> > >
> > > I honestly don't think this is important at this stage of the
> discussion.
> > > It could have
> > > whatever interface we would deem to be best. Required things:
> > >
> > > - access to at least a subset of metrics that the given
> > `CheckpointTrigger`
> > > requests,
> > >   for example via some registration mechanism, so we don't have to
> fetch
> > > all of the
> > >   metrics all the time from TMs.
> > > - some way to influence `CheckpointCoordinator`. Either via manually
> > > triggering
> > >   checkpoints, and/or ability to change the checkpointing interval.
> > >
> >
> > Hmm... I honestly think it will be useful to know the APIs due to the
> > following reasons.
> >
> > We would need to know the concrete APIs to gauge the following:
> > - For the use-case mentioned in FLIP-309 motivation section, would the
> APIs
> > of this alternative approach be more or less usable?
> > - Can these APIs reliably address the extra use-case (e.g. allow
> > checkpointing interval to change dynamically even during the unbounded
> > phase) as it claims?
> > - Can these APIs be decoupled from the APIs currently proposed in
> FLIP-309?
> >
> > For example, if the APIs of this alternative approach can be decoupled
> from
> > the APIs currently proposed in FLIP-309, then it might be reasonable to
> > work on this extra use-case with a more advanced/complicated design
> > separately in a followup work.
> >
> >
> > > > - Where is this CheckpointTrigger running? For example, is it going
> to
> > > run
> > > > on the subtask of every source operator? Or is it going to run on the
> > JM?
> > >
> > > IMO on the JM.
> > >
> > > > - Are we going to provide a default implementation of this
> > > > CheckpointTrigger in Flink that implements the algorithm described
> > below,
> > > > or do we expect each source operator developer to implement their own
> > > > CheckpointTrigger?
> > >
> > > As I mentioned before, I think we should provide at the very least the
> > > implementation
> > > that replaces the current triggering mechanism (statically configured
> > > checkpointing interval)
> > > and it would be great to provide the backpressure monitoring trigger as
> > > well.
> > >
> >
> > I agree that if there is a good use-case that can be addressed by the
> > proposed CheckpointTrigger, then it is reasonable
> > to add CheckpointTrigger and replace the current triggering mechanism
> with
> > it.
> >
> > I also agree that we will likely find such a use-case. For example,
> suppose
> > the source records have event timestamps, then it is likely
> > that we can use the trigger to dynamically control the checkpointing
> > interval based on the difference between the watermark and current system
> > time.
> >
> > But I am not sure the addition of this CheckpointTrigger should be
> coupled
> > with FLIP-309. Whether or not it is coupled probably depends on the
> > concrete API design around CheckpointTrigger.
> >
> > If you would be adamant that the backpressure monitoring doesn't cover
> well
> > > enough your use case, I would be ok to provide the hacky version that I
> > > also mentioned
> > > before:
> >
> >
> > > """
> > > Especially that if my proposed algorithm wouldn't work good enough,
> there
> > > is
> > > an obvious solution, that any source could add a metric, like let say
> > > "processingBacklog: true/false", and the `CheckpointTrigger`
> > > could use this as an override to always switch to the
> > > "slowCheckpointInterval". I don't think we need it, but that's always
> an
> > > option
> > > that would be basically equivalent to your original proposal.
> > > """
> > >
> >
> > Hmm.. do you mean we can do the following:
> > - Have all source operators emit a metric named "processingBacklog".
> > - Add a job-level config that specifies "the checkpointing interval to be
> > used when any source is processing backlog".
> > - The JM collects the "processingBacklog" periodically from all source
> > operators and uses the newly added config value as appropriate.
> >
> > The challenge with this approach is that we need to define the semantics
> of
> > this "processingBacklog" metric and have all source operators
> > implement this metric. I am not sure we are able to do this yet without
> > having users explicitly provide this information on a per-source basis.
> >
> > Suppose the job read from a bounded Kafka source, should it emit
> > "processingBacklog=true"? If yes, then the job might use long
> checkpointing
> > interval even
> > if the job is asked to process data starting from now to the next 1 hour.
> > If no, then the job might use the short checkpointing interval
> > even if the job is asked to re-process data starting from 7 days ago.
> >
> >
> > >
> > > > - How can users specify the
> > > fastCheckpointInterval/slowCheckpointInterval?
> > > > For example, will we provide APIs on the CheckpointTrigger that
> > end-users
> > > > can use to specify the checkpointing interval? What would that look
> > like?
> > >
> > > Also as I mentioned before, just like metric reporters are configured:
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
> > > Every CheckpointTrigger could have its own custom configuration.
> > >
> > > > Overall, my gut feel is that the alternative approach based on
> > > > CheckpointTrigger is more complicated
> > >
> > > Yes, as usual, more generic things are more complicated, but often more
> > > useful in the long run.
> > >
> > > > and harder to use.
> > >
> > > I don't agree. Why setting in config
> > >
> > > execution.checkpointing.trigger:
> BackPressureMonitoringCheckpointTrigger
> > >
> > >
> >
> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval:
> > > 1s
> > >
> > >
> >
> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval:
> > > 30s
> > >
> > > that we could even provide a shortcut to the above construct via:
> > >
> > > execution.checkpointing.fast-interval: 1s
> > > execution.checkpointing.slow-interval: 30s
> > >
> > > is harder compared to setting two/three checkpoint intervals, one in
> the
> > > config/or via `env.enableCheckpointing(x)`,
> > > secondly passing one/two (fast/slow) values on the source itself?
> > >
> >
> > If we can address the use-case by providing just the two job-level config
> > as described above, I agree it will indeed be simpler.
> >
> > I have tried to achieve this goal. But the caveat is that it requires
> much
> > more work than described above in order to give the configs well-defined
> > semantics. So I find it simpler to just use the approach in FLIP-309.
> >
> > Let me explain my concern below. It will be great if you or someone else
> > can help provide a solution.
> >
> > 1) We need to clearly document when the fast-interval and slow-interval
> > will be used so that users can derive the expected behavior of the job
> and
> > be able to config these values.
> >
> > 2) The trigger of fast/slow interval depends on the behavior of the
> source
> > (e.g. MySQL CDC, HybridSource). However, no existing concepts of source
> > operator (e.g. boundedness) can describe the target behavior. For
> example,
> > MySQL CDC internally has two phases, namely snapshot phase and binlog
> > phase, which are not explicitly exposed to its users via source operator
> > API. And we probably should not enumerate all internal phases of all
> source
> > operators that are affected by fast/slow interval.
> >
> > 3) An alternative approach might be to define a new concept (e.g.
> > processingBacklog) that is applied to all source operators. Then the
> > fast/slow interval's documentation can depend on this concept. That means
> > we have to add a top-level concept (similar to source boundedness) and
> > require all source operators to specify how they enforce this concept
> (e.g.
> > FileSystemSource always emits processingBacklog=true). And there might be
> > cases where the source itself (e.g. a bounded Kafka Source) can not
> > automatically derive the value of this concept, in which case we need to
> > provide option for users to explicitly specify the value for this concept
> > on a per-source basis.
> >
> >
> >
> > > > And it probably also has the issues of "having two places to
> configure
> > > checkpointing
> > > > interval" and "giving flexibility for every source to implement a
> > > different
> > > > API" (as mentioned below).
> > >
> > > No, it doesn't.
> > >
> > > > IMO, it is a hard-requirement for the user-facing API to be
> > > > clearly defined and users should be able to use the API without
> concern
> > > of
> > > > regression. And this requirement is more important than the other
> goals
> > > > discussed above because it is related to the stability/performance of
> > the
> > > > production job. What do you think?
> > >
> > > I don't agree with this. There are many things that work something in
> > > between perfectly and well enough
> > > in some fraction of use cases (maybe in 99%, maybe 95% or maybe 60%),
> > while
> > > still being very useful.
> > > Good examples are: selection of state backend, unaligned checkpoints,
> > > buffer debloating but frankly if I go
> > > through list of currently available config options, something like half
> > of
> > > them can cause regressions. Heck,
> > > even Flink itself doesn't work perfectly in 100% of the use cases, due
> > to a
> > > variety of design choices. Of
> > > course, the more use cases are fine with said feature, the better, but
> we
> > > shouldn't fixate to perfectly cover
> > > 100% of the cases, as that's impossible.
> > >
> > > In this particular case, if back pressure monitoring  trigger can work
> > well
> > > enough in 95% of cases, I would
> > > say that's already better than the originally proposed alternative,
> which
> > > doesn't work at all if user has a large
> > > backlog to reprocess from Kafka, including when using HybridSource
> AFTER
> > > the switch to Kafka has
> > > happened. For the remaining 5%, we should try to improve the behaviour
> > over
> > > time, but ultimately, users can
> > > decide to just run a fixed checkpoint interval (or at worst use the
> hacky
> > > checkpoint trigger that I mentioned
> > > before a couple of times).
> > >
> > > Also to be pedantic, if a user naively selects slow-interval in your
> > > proposal to 30 minutes, when that user's
> > > job fails on average every 15-20minutes, his job can end up in a state
> > that
> > > it can not make any progress,
> > > this arguably is quite serious regression.
> > >
> >
> > I probably should not say it is "hard requirement". After all there are
> > pros/cons. We will need to consider implementation complexity, usability,
> > extensibility etc.
> >
> > I just don't think we should take it for granted to introduce regression
> > for one use-case in order to support another use-case. If we can not find
> > an algorithm/solution that addresses
> > both use-case well, I hope we can be open to tackle them separately so
> that
> > users can choose the option that best fits their needs.
> >
> > All things else being equal, I think it is preferred for user-facing API
> to
> > be clearly defined and let users should be able to use the API without
> > concern of regression.
> >
> > Maybe we can list pros/cons for the alternative approaches we have been
> > discussing and see choose the best approach. And maybe we will end up
> > finding that use-case
> > which needs CheckpointTrigger can be tackled separately from the use-case
> > in FLIP-309.
> >
> >
> > > > I am not sure if there is a typo. Because if
> > backPressuredTimeMsPerSecond
> > > =
> > > > 0, then maxRecordsConsumedWithoutBackpressure =
> numRecordsInPerSecond /
> > > > 1000 * metricsUpdateInterval according to the above algorithm.
> > > >
> > > > Do you mean "maxRecordsConsumedWithoutBackpressure =
> > > (numRecordsInPerSecond
> > > > / (1 - backPressuredTimeMsPerSecond / 1000)) *
> metricsUpdateInterval"?
> > >
> > > It looks like there is indeed some mistake in my proposal above. Yours
> > look
> > > more correct, it probably
> > > still needs some safeguard/special handling if
> > > `backPressuredTimeMsPerSecond > 950`
> > >
> > > > The only information it can access is the backlog.
> > >
> > > Again no. It can access whatever we want to provide to it.
> > >
> > > Regarding the rest of your concerns. It's a matter of tweaking the
> > > parameters and the algorithm itself,
> > > and how much safety-net do we want to have. Ultimately, I'm pretty sure
> > > that's a (for 95-99% of cases)
> > > solvable problem. If not, there is always the hacky solution, that
> could
> > be
> > > even integrated into this above
> > > mentioned algorithm as a short circuit to always reach `slow-interval`.
> > >
> > > Apart of that, you picked 3 minutes as the checkpointing interval in
> your
> > > counter example. In most cases
> > > any interval above 1 minute would inflict pretty negligible overheads,
> so
> > > all in all, I would doubt there is
> > > a significant benefit (in most cases) of increasing 3 minute checkpoint
> > > interval to anything more, let alone
> > > 30 minutes.
> > >
> >
> > I am not sure we should design the algorithm with the assumption that the
> > short checkpointing interval will always be higher than 1 minute etc.
> >
> > I agree the proposed algorithm can solve most cases where the resource is
> > sufficient and there is always no backlog in source subtasks. On the
> other
> > hand, what makes SRE
> > life hard is probably the remaining 1-5% cases where the traffic is spiky
> > and the cluster is reaching its capacity limit. The ability to predict
> and
> > control Flink job's behavior (including checkpointing interval) can
> > considerably reduce the burden of manging Flink jobs.
> >
> > Best,
> > Dong
> >
> >
> > >
> > > Best,
> > > Piotrek
> > >
> > >
> > >
> > >
> > >
> > > sob., 3 cze 2023 o 05:44 Dong Lin <lindon...@gmail.com> napisał(a):
> > >
> > > > Hi Piotr,
> > > >
> > > > Thanks for the explanations. I have some followup questions below.
> > > >
> > > > On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski <pnowoj...@apache.org
> >
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > Thanks for chipping in the discussion Ahmed!
> > > > >
> > > > > Regarding using the REST API. Currently I'm leaning towards
> > > implementing
> > > > > this feature inside the Flink itself, via some pluggable interface.
> > > > > REST API solution would be tempting, but I guess not everyone is
> > using
> > > > > Flink Kubernetes Operator.
> > > > >
> > > > > @Dong
> > > > >
> > > > > > I am not sure metrics such as isBackPressured are already sent to
> > JM.
> > > > >
> > > > > Fetching code path on the JM:
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
> > > > >
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
> > > > >
> > > > > Example code path accessing Task level metrics via JM using the
> > > > > `MetricStore`:
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
> > > > >
> > > >
> > > > Thanks for the code reference. I checked the code that invoked these
> > two
> > > > classes and found the following information:
> > > >
> > > > - AggregatingSubtasksMetricsHandler#getStoresis currently invoked
> only
> > > > when AggregatingJobsMetricsHandler is invoked.
> > > > - AggregatingJobsMetricsHandler is only instantiated and returned by
> > > > WebMonitorEndpoint#initializeHandlers
> > > > - WebMonitorEndpoint#initializeHandlers is only used by
> > > RestServerEndpoint.
> > > > And RestServerEndpoint invokes these handlers in response to external
> > > REST
> > > > request.
> > > >
> > > > I understand that JM will get the backpressure-related metrics every
> > time
> > > > the RestServerEndpoint receives the REST request to get these
> metrics.
> > > But
> > > > I am not sure if RestServerEndpoint is already always receiving the
> > REST
> > > > metrics at regular interval (suppose there is no human manually
> > > > opening/clicking the Flink Web UI). And if it does, what is the
> > interval?
> > > >
> > > >
> > > >
> > > > > > For example, let's say every source operator subtask reports this
> > > > metric
> > > > > to
> > > > > > JM once every 10 seconds. There are 100 source subtasks. And each
> > > > subtask
> > > > > > is backpressured roughly 10% of the total time due to traffic
> > spikes
> > > > (and
> > > > > > limited buffer). Then at any given time, there are 1 - 0.9^100 =
> > > > 99.997%
> > > > > > chance that there is at least one subtask that is backpressured.
> > Then
> > > > we
> > > > > > have to wait for at least 10 seconds to check again.
> > > > >
> > > > > backPressuredTimeMsPerSecond and other related metrics (like
> > > > > busyTimeMsPerSecond) are not subject to that problem.
> > > > > They are recalculated once every metric fetching interval, and they
> > > > report
> > > > > accurately on average the given subtask spent
> > > busy/idling/backpressured.
> > > > > In your example, backPressuredTimeMsPerSecond would report 100ms/s.
> > > >
> > > >
> > > > Suppose every subtask is already reporting
> backPressuredTimeMsPerSecond
> > > to
> > > > JM once every 100 ms. If a job has 10 operators (that are not
> chained)
> > > and
> > > > each operator has 100 subtasks, then JM would need to handle 10000
> > > requests
> > > > per second to receive metrics from these 1000 subtasks. It seems
> like a
> > > > non-trivial overhead for medium-to-large sized jobs and can make JM
> the
> > > > performance bottleneck during job execution.
> > > >
> > > > I would be surprised if Flink is already paying this much overhead
> just
> > > for
> > > > metrics monitoring. That is the main reason I still doubt it is true.
> > Can
> > > > you show where this 100 ms is currently configured?
> > > >
> > > > Alternatively, maybe you mean that we should add extra code to invoke
> > the
> > > > REST API at 100 ms interval. Then that means we need to considerably
> > > > increase the network/cpu overhead at JM, where the overhead will
> > increase
> > > > as the number of TM/slots increase, which may pose risk to the
> > > scalability
> > > > of the proposed design. I am not sure we should do this. What do you
> > > think?
> > > >
> > > >
> > > > >
> > > > > > While it will be nice to support additional use-cases
> > > > > > with one proposal, it is probably also reasonable to make
> > incremental
> > > > > > progress and support the low-hanging-fruit use-case first. The
> > choice
> > > > > > really depends on the complexity and the importance of supporting
> > the
> > > > > extra
> > > > > > use-cases.
> > > > >
> > > > > That would be true, if that was a private implementation detail or
> if
> > > the
> > > > > low-hanging-fruit-solution would be on the direct path to the final
> > > > > solution.
> > > > > That's unfortunately not the case here. This will add public facing
> > > API,
> > > > > that we will later need to maintain, no matter what the final
> > solution
> > > > will
> > > > > be,
> > > > > and at the moment at least I don't see it being related to a
> > "perfect"
> > > > > solution.
> > > >
> > > >
> > > > Sure. Then let's decide the final solution first.
> > > >
> > > >
> > > > > > I guess the point is that the suggested approach, which
> dynamically
> > > > > > determines the checkpointing interval based on the backpressure,
> > may
> > > > > cause
> > > > > > regression when the checkpointing interval is relatively low.
> This
> > > > makes
> > > > > it
> > > > > > hard for users to enable this feature in production. It is like
> an
> > > > > > auto-driving system that is not guaranteed to work
> > > > >
> > > > > Yes, creating a more generic solution that would require less
> > > > configuration
> > > > > is usually more difficult then static configurations.
> > > > > It doesn't mean we shouldn't try to do that. Especially that if my
> > > > proposed
> > > > > algorithm wouldn't work good enough, there is
> > > > > an obvious solution, that any source could add a metric, like let
> say
> > > > > "processingBacklog: true/false", and the `CheckpointTrigger`
> > > > > could use this as an override to always switch to the
> > > > > "slowCheckpointInterval". I don't think we need it, but that's
> always
> > > an
> > > > > option
> > > > > that would be basically equivalent to your original proposal. Or
> even
> > > > > source could add "suggestedCheckpointInterval : int", and
> > > > > `CheckpointTrigger` could use that value if present as a hint in
> one
> > > way
> > > > or
> > > > > another.
> > > > >
> > > >
> > > > So far we have talked about the possibility of using
> CheckpointTrigger
> > > and
> > > > mentioned the CheckpointTrigger
> > > > and read metric values.
> > > >
> > > > Can you help answer the following questions so that I can understand
> > the
> > > > alternative solution more concretely:
> > > >
> > > > - What is the interface of this CheckpointTrigger? For example, are
> we
> > > > going to give CheckpointTrigger a context that it can use to fetch
> > > > arbitrary metric values? This can help us understand what information
> > > this
> > > > user-defined CheckpointTrigger can use to make the checkpoint
> decision.
> > > > - Where is this CheckpointTrigger running? For example, is it going
> to
> > > run
> > > > on the subtask of every source operator? Or is it going to run on the
> > JM?
> > > > - Are we going to provide a default implementation of this
> > > > CheckpointTrigger in Flink that implements the algorithm described
> > below,
> > > > or do we expect each source operator developer to implement their own
> > > > CheckpointTrigger?
> > > > - How can users specify the
> > > fastCheckpointInterval/slowCheckpointInterval?
> > > > For example, will we provide APIs on the CheckpointTrigger that
> > end-users
> > > > can use to specify the checkpointing interval? What would that look
> > like?
> > > >
> > > > Overall, my gut feel is that the alternative approach based on
> > > > CheckpointTrigger is more complicated and harder to use. And it
> > probably
> > > > also has the issues of "having two places to configure checkpointing
> > > > interval" and "giving flexibility for every source to implement a
> > > different
> > > > API" (as mentioned below).
> > > >
> > > > Maybe we can evaluate it more after knowing the answers to the above
> > > > questions.
> > > >
> > > >
> > > >
> > > > >
> > > > > > On the other hand, the approach currently proposed in the FLIP is
> > > much
> > > > > > simpler as it does not depend on backpressure. Users specify the
> > > extra
> > > > > > interval requirement on the specific sources (e.g. HybridSource,
> > > MySQL
> > > > > CDC
> > > > > > Source) and can easily know the checkpointing interval will be
> used
> > > on
> > > > > the
> > > > > > continuous phase of the corresponding source. This is pretty much
> > > same
> > > > as
> > > > > > how users use the existing execution.checkpointing.interval
> config.
> > > So
> > > > > > there is no extra concern of regression caused by this approach.
> > > > >
> > > > > To an extent, but as I have already previously mentioned I really
> > > really
> > > > do
> > > > > not like idea of:
> > > > >   - having two places to configure checkpointing interval (config
> > file
> > > > and
> > > > > in the Source builders)
> > > > >   - giving flexibility for every source to implement a different
> API
> > > for
> > > > > that purpose
> > > > >   - creating a solution that is not generic enough, so that we will
> > > need
> > > > a
> > > > > completely different mechanism in the future anyway
> > > > >
> > > >
> > > > Yeah, I understand different developers might have different
> > > > concerns/tastes for these APIs. Ultimately, there might not be a
> > perfect
> > > > solution and we have to choose based on the pros/cons of these
> > solutions.
> > > >
> > > > I agree with you that, all things being equal, it is preferable to 1)
> > > have
> > > > one place to configure checkpointing intervals, 2) have all source
> > > > operators use the same API, and 3) create a solution that is generic
> > and
> > > > last lasting. Note that these three goals affects the usability and
> > > > extensibility of the API, but not necessarily the
> stability/performance
> > > of
> > > > the production job.
> > > >
> > > > BTW, there are also other preferrable goals. For example, it is very
> > > useful
> > > > for the job's behavior to be predictable and interpretable so that
> SRE
> > > can
> > > > operator/debug the Flink in an easier way. We can list these
> pros/cons
> > > > altogether later.
> > > >
> > > > I am wondering if we can first agree on the priority of goals we want
> > to
> > > > achieve. IMO, it is a hard-requirement for the user-facing API to be
> > > > clearly defined and users should be able to use the API without
> concern
> > > of
> > > > regression. And this requirement is more important than the other
> goals
> > > > discussed above because it is related to the stability/performance of
> > the
> > > > production job. What do you think?
> > > >
> > > >
> > > > >
> > > > > > Sounds good. Looking forward to learning more ideas.
> > > > >
> > > > > I have thought about this a bit more, and I think we don't need to
> > > check
> > > > > for the backpressure status, or how much overloaded all of the
> > > operators
> > > > > are.
> > > > > We could just check three things for source operators:
> > > > > 1. pendingRecords (backlog length)
> > > > > 2. numRecordsInPerSecond
> > > > > 3. backPressuredTimeMsPerSecond
> > > > >
> > > > > // int metricsUpdateInterval = 10s // obtained from config
> > > > > // Next line calculates how many records can we consume from the
> > > backlog,
> > > > > assuming
> > > > > // that magically the reason behind a backpressure vanishes. We
> will
> > > use
> > > > > this only as
> > > > > // a safeguard  against scenarios like for example if backpressure
> > was
> > > > > caused by some
> > > > > // intermittent failure/performance degradation.
> > > > > maxRecordsConsumedWithoutBackpressure = (numRecordsInPerSecond /
> > (1000
> > > > > - backPressuredTimeMsPerSecond / 1000)) * metricsUpdateInterval
> > > > >
> > > >
> > > > I am not sure if there is a typo. Because if
> > > backPressuredTimeMsPerSecond =
> > > > 0, then maxRecordsConsumedWithoutBackpressure =
> numRecordsInPerSecond /
> > > > 1000 * metricsUpdateInterval according to the above algorithm.
> > > >
> > > > Do you mean "maxRecordsConsumedWithoutBackpressure =
> > > (numRecordsInPerSecond
> > > > / (1 - backPressuredTimeMsPerSecond / 1000)) *
> metricsUpdateInterval"?
> > > >
> > > >
> > > > >
> > > > > // we are excluding maxRecordsConsumedWithoutBackpressure from the
> > > > backlog
> > > > > as
> > > > > // a safeguard against an intermittent back pressure problems, so
> > that
> > > we
> > > > > don't
> > > > > // calculate next checkpoint interval far far in the future, while
> > the
> > > > > backpressure
> > > > > // goes away before we will recalculate metrics and new
> checkpointing
> > > > > interval
> > > > > timeToConsumeBacklog = (pendingRecords -
> > > > > maxRecordsConsumedWithoutBackpressure) / numRecordsInPerSecond
> > > > >
> > > > >
> > > > > Then we can use those numbers to calculate desired checkpointed
> > > interval
> > > > > for example like this:
> > > > >
> > > > > long calculatedCheckpointInterval = timeToConsumeBacklog / 10;
> //this
> > > may
> > > > > need some refining
> > > > > long nextCheckpointInterval = min(max(fastCheckpointInterval,
> > > > > calculatedCheckpointInterval), slowCheckpointInterval);
> > > > > long nextCheckpointTs = lastCheckpointTs + nextCheckpointInterval;
> > > > >
> > > > > WDYT?
> > > >
> > > >
> > > > I think the idea of the above algorithm is to incline to use the
> > > > fastCheckpointInterval unless we are very sure the backlog will take
> a
> > > long
> > > > time to process. This can alleviate the concern of regression during
> > the
> > > > continuous_bounded phase since we are more likely to use the
> > > > fastCheckpointInterval. However, it can cause regression during the
> > > bounded
> > > > phase.
> > > >
> > > > I will use a concrete example to explain the risk of regression:
> > > > - The user is using HybridSource to read from HDFS followed by Kafka.
> > The
> > > > data in HDFS is old and there is no need for data freshness for the
> > data
> > > in
> > > > HDFS.
> > > > - The user configures the job as below:
> > > >   - fastCheckpointInterval = 3 minutes
> > > >   - slowCheckpointInterval = 30 minutes
> > > >   - metricsUpdateInterval = 100 ms
> > > >
> > > > Using the above formulate, we can know that once pendingRecords
> > > > <= numRecordsInPerSecond * 30-minutes, then
> > calculatedCheckpointInterval
> > > <=
> > > > 3 minutes, meaning that we will use slowCheckpointInterval as the
> > > > checkpointing interval. Then in the last 30 minutes of the bounded
> > phase,
> > > > the checkpointing frequency will be 10X higher than what the user
> > wants.
> > > >
> > > > Also note that the same issue would also considerably limit the
> > benefits
> > > of
> > > > the algorithm. For example, during the continuous phase, the
> algorithm
> > > will
> > > > only be better than the approach in FLIP-309 when there is at least
> > > > 30-minutes worth of backlog in the source.
> > > >
> > > > Sure, having a slower checkpointing interval in this extreme case
> > (where
> > > > there is 30-minutes backlog in the continous-unbounded phase) is
> still
> > > > useful when this happens. But since this is the un-common case, and
> the
> > > > right solution is probably to do capacity planning to avoid this from
> > > > happening in the first place, I am not sure it is worth optimizing
> for
> > > this
> > > > case at the cost of regression in the bounded phase and the reduced
> > > > operational predictability for users (e.g. what checkpointing
> interval
> > > > should I expect at this stage of the job).
> > > >
> > > > I think the fundamental issue with this algorithm is that it is
> applied
> > > to
> > > > both the bounded phases and the continous_unbounded phases without
> > > knowing
> > > > which phase the job is running at. The only information it can access
> > is
> > > > the backlog. But two sources with the same amount of backlog do not
> > > > necessarily mean they have the same data freshness requirement.
> > > >
> > > > In this particular example, users know that the data in HDFS is very
> > old
> > > > and there is no need for data freshness. Users can express signals
> via
> > > the
> > > > per-source API proposed in the FLIP. This is why the current approach
> > in
> > > > FLIP-309 can be better in this case.
> > > >
> > > > What do you think?
> > > >
> > > > Best,
> > > > Dong
> > > >
> > > >
> > > > >
> > > > > Best,
> > > > > Piotrek
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to