Thanks Jack, Jingsong, and Zhu for the review!

Thanks Zhu for the suggestion. I have updated the configuration name as
suggested.

On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu <reed...@gmail.com> wrote:

> Thanks Dong and Yunfeng for creating this FLIP and driving this discussion.
>
> The new design looks generally good to me. Increasing the checkpoint
> interval when the job is processing backlogs is easier for users to
> understand and can help in more scenarios.
>
> I have one comment about the new configuration.
> Naming the new configuration
> "execution.checkpointing.interval-during-backlog" would be better
> according to Flink config naming convention.
> It is also because that nested config keys should be avoided. See
> FLINK-29372 for more details.
>
> Thanks,
> Zhu
>
> Jingsong Li <jingsongl...@gmail.com> 于2023年6月27日周二 15:45写道:
> >
> > Looks good to me!
> >
> > Thanks Dong, Yunfeng and all for your discussion and design.
> >
> > Best,
> > Jingsong
> >
> > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu <imj...@gmail.com> wrote:
> > >
> > > Thank you Dong for driving this FLIP.
> > >
> > > The new design looks good to me!
> > >
> > > Best,
> > > Jark
> > >
> > > > 2023年6月27日 14:38,Dong Lin <lindon...@gmail.com> 写道:
> > > >
> > > > Thank you Leonard for the review!
> > > >
> > > > Hi Piotr, do you have any comments on the latest proposal?
> > > >
> > > > I am wondering if it is OK to start the voting thread this week.
> > > >
> > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu <xbjt...@gmail.com>
> wrote:
> > > >
> > > >> Thanks Dong for driving this FLIP forward!
> > > >>
> > > >> Introducing  `backlog status` concept for flink job makes sense to
> me as
> > > >> following reasons:
> > > >>
> > > >> From concept/API design perspective, it’s more general and natural
> than
> > > >> above proposals as it can be used in HybridSource for bounded
> records, CDC
> > > >> Source for history snapshot and general sources like KafkaSource for
> > > >> historical messages.
> > > >>
> > > >> From user cases/requirements, I’ve seen many users manually to set
> larger
> > > >> checkpoint interval during backfilling and then set a shorter
> checkpoint
> > > >> interval for real-time processing in their production environments
> as a
> > > >> flink application optimization. Now, the flink framework can make
> this
> > > >> optimization no longer require the user to set the checkpoint
> interval and
> > > >> restart the job multiple times.
> > > >>
> > > >> Following supporting using larger checkpoint for job under backlog
> status
> > > >> in current FLIP, we can explore supporting larger
> parallelism/memory/cpu
> > > >> for job under backlog status in the future.
> > > >>
> > > >> In short, the updated FLIP looks good to me.
> > > >>
> > > >>
> > > >> Best,
> > > >> Leonard
> > > >>
> > > >>
> > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >>>
> > > >>> Hi Piotr,
> > > >>>
> > > >>> Thanks again for proposing the isProcessingBacklog concept.
> > > >>>
> > > >>> After discussing with Becket Qin and thinking about this more, I
> agree it
> > > >>> is a better idea to add a top-level concept to all source
> operators to
> > > >>> address the target use-case.
> > > >>>
> > > >>> The main reason that changed my mind is that isProcessingBacklog
> can be
> > > >>> described as an inherent/nature attribute of every source instance
> and
> > > >> its
> > > >>> semantics does not need to depend on any specific checkpointing
> policy.
> > > >>> Also, we can hardcode the isProcessingBacklog behavior for the
> sources we
> > > >>> have considered so far (e.g. HybridSource and MySQL CDC source)
> without
> > > >>> asking users to explicitly configure the per-source behavior, which
> > > >> indeed
> > > >>> provides better user experience.
> > > >>>
> > > >>> I have updated the FLIP based on the latest suggestions. The
> latest FLIP
> > > >> no
> > > >>> longer introduces per-source config that can be used by end-users.
> While
> > > >> I
> > > >>> agree with you that CheckpointTrigger can be a useful feature to
> address
> > > >>> additional use-cases, I am not sure it is necessary for the
> use-case
> > > >>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger
> separately
> > > >>> in another FLIP?
> > > >>>
> > > >>> Can you help take another look at the updated FLIP?
> > > >>>
> > > >>> Best,
> > > >>> Dong
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <
> pnowoj...@apache.org>
> > > >>> wrote:
> > > >>>
> > > >>>> Hi Dong,
> > > >>>>
> > > >>>>> Suppose there are 1000 subtask and each subtask has 1% chance of
> being
> > > >>>>> "backpressured" at a given time (due to random traffic spikes).
> Then at
> > > >>>> any
> > > >>>>> given time, the chance of the job
> > > >>>>> being considered not-backpressured = (1-0.01)^1000. Since we
> evaluate
> > > >> the
> > > >>>>> backpressure metric once a second, the estimated time for the job
> > > >>>>> to be considered not-backpressured is roughly 1 /
> ((1-0.01)^1000) =
> > > >> 23163
> > > >>>>> sec = 6.4 hours.
> > > >>>>>
> > > >>>>> This means that the job will effectively always use the longer
> > > >>>>> checkpointing interval. It looks like a real concern, right?
> > > >>>>
> > > >>>> Sorry I don't understand where you are getting those numbers from.
> > > >>>> Instead of trying to find loophole after loophole, could you try
> to
> > > >> think
> > > >>>> how a given loophole could be improved/solved?
> > > >>>>
> > > >>>>> Hmm... I honestly think it will be useful to know the APIs due
> to the
> > > >>>>> following reasons.
> > > >>>>
> > > >>>> Please propose something. I don't think it's needed.
> > > >>>>
> > > >>>>> - For the use-case mentioned in FLIP-309 motivation section,
> would the
> > > >>>> APIs
> > > >>>>> of this alternative approach be more or less usable?
> > > >>>>
> > > >>>> Everything that you originally wanted to achieve in FLIP-309, you
> could
> > > >> do
> > > >>>> as well in my proposal.
> > > >>>> Vide my many mentions of the "hacky solution".
> > > >>>>
> > > >>>>> - Can these APIs reliably address the extra use-case (e.g. allow
> > > >>>>> checkpointing interval to change dynamically even during the
> unbounded
> > > >>>>> phase) as it claims?
> > > >>>>
> > > >>>> I don't see why not.
> > > >>>>
> > > >>>>> - Can these APIs be decoupled from the APIs currently proposed in
> > > >>>> FLIP-309?
> > > >>>>
> > > >>>> Yes
> > > >>>>
> > > >>>>> For example, if the APIs of this alternative approach can be
> decoupled
> > > >>>> from
> > > >>>>> the APIs currently proposed in FLIP-309, then it might be
> reasonable to
> > > >>>>> work on this extra use-case with a more advanced/complicated
> design
> > > >>>>> separately in a followup work.
> > > >>>>
> > > >>>> As I voiced my concerns previously, the current design of
> FLIP-309 would
> > > >>>> clog the public API and in the long run confuse the users. IMO
> It's
> > > >>>> addressing the
> > > >>>> problem in the wrong place.
> > > >>>>
> > > >>>>> Hmm.. do you mean we can do the following:
> > > >>>>> - Have all source operators emit a metric named
> "processingBacklog".
> > > >>>>> - Add a job-level config that specifies "the checkpointing
> interval to
> > > >> be
> > > >>>>> used when any source is processing backlog".
> > > >>>>> - The JM collects the "processingBacklog" periodically from all
> source
> > > >>>>> operators and uses the newly added config value as appropriate.
> > > >>>>
> > > >>>> Yes.
> > > >>>>
> > > >>>>> The challenge with this approach is that we need to define the
> > > >> semantics
> > > >>>> of
> > > >>>>> this "processingBacklog" metric and have all source operators
> > > >>>>> implement this metric. I am not sure we are able to do this yet
> without
> > > >>>>> having users explicitly provide this information on a per-source
> basis.
> > > >>>>>
> > > >>>>> Suppose the job read from a bounded Kafka source, should it emit
> > > >>>>> "processingBacklog=true"? If yes, then the job might use long
> > > >>>> checkpointing
> > > >>>>> interval even
> > > >>>>> if the job is asked to process data starting from now to the
> next 1
> > > >> hour.
> > > >>>>> If no, then the job might use the short checkpointing interval
> > > >>>>> even if the job is asked to re-process data starting from 7 days
> ago.
> > > >>>>
> > > >>>> Yes. The same can be said of your proposal. Your proposal has the
> very
> > > >> same
> > > >>>> issues
> > > >>>> that every source would have to implement it differently, most
> sources
> > > >>>> would
> > > >>>> have no idea how to properly calculate the new requested
> checkpoint
> > > >>>> interval,
> > > >>>> for those that do know how to do that, user would have to
> configure
> > > >> every
> > > >>>> source
> > > >>>> individually and yet again we would end up with a system, that
> works
> > > >> only
> > > >>>> partially in
> > > >>>> some special use cases (HybridSource), that's confusing the users
> even
> > > >>>> more.
> > > >>>>
> > > >>>> That's why I think the more generic solution, working primarily
> on the
> > > >> same
> > > >>>> metrics that are used by various auto scaling solutions (like
> Flink K8s
> > > >>>> operator's
> > > >>>> autosaler) would be better. The hacky solution I proposed to:
> > > >>>> 1. show you that the generic solution is simply a superset of your
> > > >> proposal
> > > >>>> 2. if you are adamant that busyness/backpressured/records
> processing
> > > >>>> rate/pending records
> > > >>>>   metrics wouldn't cover your use case sufficiently (imo they
> can),
> > > >> then
> > > >>>> you can very easily
> > > >>>>   enhance this algorithm with using some hints from the sources.
> Like
> > > >>>> "processingBacklog==true"
> > > >>>>   to short circuit the main algorithm, if `processingBacklog` is
> > > >>>> available.
> > > >>>>
> > > >>>> Best,
> > > >>>> Piotrek
> > > >>>>
> > > >>>>
> > > >>>> pt., 16 cze 2023 o 04:45 Dong Lin <lindon...@gmail.com>
> napisał(a):
> > > >>>>
> > > >>>>> Hi again Piotr,
> > > >>>>>
> > > >>>>> Thank you for the reply. Please see my reply inline.
> > > >>>>>
> > > >>>>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski <
> > > >>>> piotr.nowoj...@gmail.com>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hi again Dong,
> > > >>>>>>
> > > >>>>>>> I understand that JM will get the backpressure-related metrics
> every
> > > >>>>> time
> > > >>>>>>> the RestServerEndpoint receives the REST request to get these
> > > >>>> metrics.
> > > >>>>>> But
> > > >>>>>>> I am not sure if RestServerEndpoint is already always
> receiving the
> > > >>>>> REST
> > > >>>>>>> metrics at regular interval (suppose there is no human manually
> > > >>>>>>> opening/clicking the Flink Web UI). And if it does, what is the
> > > >>>>> interval?
> > > >>>>>>
> > > >>>>>> Good catch, I've thought that metrics are pre-emptively sent to
> JM
> > > >>>> every
> > > >>>>> 10
> > > >>>>>> seconds.
> > > >>>>>> Indeed that's not the case at the moment, and that would have
> to be
> > > >>>>>> improved.
> > > >>>>>>
> > > >>>>>>> I would be surprised if Flink is already paying this much
> overhead
> > > >>>> just
> > > >>>>>> for
> > > >>>>>>> metrics monitoring. That is the main reason I still doubt it
> is true.
> > > >>>>> Can
> > > >>>>>>> you show where this 100 ms is currently configured?
> > > >>>>>>>
> > > >>>>>>> Alternatively, maybe you mean that we should add extra code to
> invoke
> > > >>>>> the
> > > >>>>>>> REST API at 100 ms interval. Then that means we need to
> considerably
> > > >>>>>>> increase the network/cpu overhead at JM, where the overhead
> will
> > > >>>>> increase
> > > >>>>>>> as the number of TM/slots increase, which may pose risk to the
> > > >>>>>> scalability
> > > >>>>>>> of the proposed design. I am not sure we should do this. What
> do you
> > > >>>>>> think?
> > > >>>>>>
> > > >>>>>> Sorry. I didn't mean metric should be reported every 100ms. I
> meant
> > > >>>> that
> > > >>>>>> "backPressuredTimeMsPerSecond (metric) would report (a value of)
> > > >>>>> 100ms/s."
> > > >>>>>> once per metric interval (10s?).
> > > >>>>>>
> > > >>>>>
> > > >>>>> Suppose there are 1000 subtask and each subtask has 1% chance of
> being
> > > >>>>> "backpressured" at a given time (due to random traffic spikes).
> Then at
> > > >>>> any
> > > >>>>> given time, the chance of the job
> > > >>>>> being considered not-backpressured = (1-0.01)^1000. Since we
> evaluate
> > > >> the
> > > >>>>> backpressure metric once a second, the estimated time for the job
> > > >>>>> to be considered not-backpressured is roughly 1 /
> ((1-0.01)^1000) =
> > > >> 23163
> > > >>>>> sec = 6.4 hours.
> > > >>>>>
> > > >>>>> This means that the job will effectively always use the longer
> > > >>>>> checkpointing interval. It looks like a real concern, right?
> > > >>>>>
> > > >>>>>
> > > >>>>>>> - What is the interface of this CheckpointTrigger? For
> example, are
> > > >>>> we
> > > >>>>>>> going to give CheckpointTrigger a context that it can use to
> fetch
> > > >>>>>>> arbitrary metric values? This can help us understand what
> information
> > > >>>>>> this
> > > >>>>>>> user-defined CheckpointTrigger can use to make the checkpoint
> > > >>>> decision.
> > > >>>>>>
> > > >>>>>> I honestly don't think this is important at this stage of the
> > > >>>> discussion.
> > > >>>>>> It could have
> > > >>>>>> whatever interface we would deem to be best. Required things:
> > > >>>>>>
> > > >>>>>> - access to at least a subset of metrics that the given
> > > >>>>> `CheckpointTrigger`
> > > >>>>>> requests,
> > > >>>>>> for example via some registration mechanism, so we don't have to
> > > >>>> fetch
> > > >>>>>> all of the
> > > >>>>>> metrics all the time from TMs.
> > > >>>>>> - some way to influence `CheckpointCoordinator`. Either via
> manually
> > > >>>>>> triggering
> > > >>>>>> checkpoints, and/or ability to change the checkpointing
> interval.
> > > >>>>>>
> > > >>>>>
> > > >>>>> Hmm... I honestly think it will be useful to know the APIs due
> to the
> > > >>>>> following reasons.
> > > >>>>>
> > > >>>>> We would need to know the concrete APIs to gauge the following:
> > > >>>>> - For the use-case mentioned in FLIP-309 motivation section,
> would the
> > > >>>> APIs
> > > >>>>> of this alternative approach be more or less usable?
> > > >>>>> - Can these APIs reliably address the extra use-case (e.g. allow
> > > >>>>> checkpointing interval to change dynamically even during the
> unbounded
> > > >>>>> phase) as it claims?
> > > >>>>> - Can these APIs be decoupled from the APIs currently proposed in
> > > >>>> FLIP-309?
> > > >>>>>
> > > >>>>> For example, if the APIs of this alternative approach can be
> decoupled
> > > >>>> from
> > > >>>>> the APIs currently proposed in FLIP-309, then it might be
> reasonable to
> > > >>>>> work on this extra use-case with a more advanced/complicated
> design
> > > >>>>> separately in a followup work.
> > > >>>>>
> > > >>>>>
> > > >>>>>>> - Where is this CheckpointTrigger running? For example, is it
> going
> > > >>>> to
> > > >>>>>> run
> > > >>>>>>> on the subtask of every source operator? Or is it going to run
> on the
> > > >>>>> JM?
> > > >>>>>>
> > > >>>>>> IMO on the JM.
> > > >>>>>>
> > > >>>>>>> - Are we going to provide a default implementation of this
> > > >>>>>>> CheckpointTrigger in Flink that implements the algorithm
> described
> > > >>>>> below,
> > > >>>>>>> or do we expect each source operator developer to implement
> their own
> > > >>>>>>> CheckpointTrigger?
> > > >>>>>>
> > > >>>>>> As I mentioned before, I think we should provide at the very
> least the
> > > >>>>>> implementation
> > > >>>>>> that replaces the current triggering mechanism (statically
> configured
> > > >>>>>> checkpointing interval)
> > > >>>>>> and it would be great to provide the backpressure monitoring
> trigger
> > > >> as
> > > >>>>>> well.
> > > >>>>>>
> > > >>>>>
> > > >>>>> I agree that if there is a good use-case that can be addressed
> by the
> > > >>>>> proposed CheckpointTrigger, then it is reasonable
> > > >>>>> to add CheckpointTrigger and replace the current triggering
> mechanism
> > > >>>> with
> > > >>>>> it.
> > > >>>>>
> > > >>>>> I also agree that we will likely find such a use-case. For
> example,
> > > >>>> suppose
> > > >>>>> the source records have event timestamps, then it is likely
> > > >>>>> that we can use the trigger to dynamically control the
> checkpointing
> > > >>>>> interval based on the difference between the watermark and
> current
> > > >> system
> > > >>>>> time.
> > > >>>>>
> > > >>>>> But I am not sure the addition of this CheckpointTrigger should
> be
> > > >>>> coupled
> > > >>>>> with FLIP-309. Whether or not it is coupled probably depends on
> the
> > > >>>>> concrete API design around CheckpointTrigger.
> > > >>>>>
> > > >>>>> If you would be adamant that the backpressure monitoring doesn't
> cover
> > > >>>> well
> > > >>>>>> enough your use case, I would be ok to provide the hacky
> version that
> > > >> I
> > > >>>>>> also mentioned
> > > >>>>>> before:
> > > >>>>>
> > > >>>>>
> > > >>>>>> """
> > > >>>>>> Especially that if my proposed algorithm wouldn't work good
> enough,
> > > >>>> there
> > > >>>>>> is
> > > >>>>>> an obvious solution, that any source could add a metric, like
> let say
> > > >>>>>> "processingBacklog: true/false", and the `CheckpointTrigger`
> > > >>>>>> could use this as an override to always switch to the
> > > >>>>>> "slowCheckpointInterval". I don't think we need it, but that's
> always
> > > >>>> an
> > > >>>>>> option
> > > >>>>>> that would be basically equivalent to your original proposal.
> > > >>>>>> """
> > > >>>>>>
> > > >>>>>
> > > >>>>> Hmm.. do you mean we can do the following:
> > > >>>>> - Have all source operators emit a metric named
> "processingBacklog".
> > > >>>>> - Add a job-level config that specifies "the checkpointing
> interval to
> > > >> be
> > > >>>>> used when any source is processing backlog".
> > > >>>>> - The JM collects the "processingBacklog" periodically from all
> source
> > > >>>>> operators and uses the newly added config value as appropriate.
> > > >>>>>
> > > >>>>> The challenge with this approach is that we need to define the
> > > >> semantics
> > > >>>> of
> > > >>>>> this "processingBacklog" metric and have all source operators
> > > >>>>> implement this metric. I am not sure we are able to do this yet
> without
> > > >>>>> having users explicitly provide this information on a per-source
> basis.
> > > >>>>>
> > > >>>>> Suppose the job read from a bounded Kafka source, should it emit
> > > >>>>> "processingBacklog=true"? If yes, then the job might use long
> > > >>>> checkpointing
> > > >>>>> interval even
> > > >>>>> if the job is asked to process data starting from now to the
> next 1
> > > >> hour.
> > > >>>>> If no, then the job might use the short checkpointing interval
> > > >>>>> even if the job is asked to re-process data starting from 7 days
> ago.
> > > >>>>>
> > > >>>>>
> > > >>>>>>
> > > >>>>>>> - How can users specify the
> > > >>>>>> fastCheckpointInterval/slowCheckpointInterval?
> > > >>>>>>> For example, will we provide APIs on the CheckpointTrigger that
> > > >>>>> end-users
> > > >>>>>>> can use to specify the checkpointing interval? What would that
> look
> > > >>>>> like?
> > > >>>>>>
> > > >>>>>> Also as I mentioned before, just like metric reporters are
> configured:
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
> > > >>>>>> Every CheckpointTrigger could have its own custom configuration.
> > > >>>>>>
> > > >>>>>>> Overall, my gut feel is that the alternative approach based on
> > > >>>>>>> CheckpointTrigger is more complicated
> > > >>>>>>
> > > >>>>>> Yes, as usual, more generic things are more complicated, but
> often
> > > >> more
> > > >>>>>> useful in the long run.
> > > >>>>>>
> > > >>>>>>> and harder to use.
> > > >>>>>>
> > > >>>>>> I don't agree. Why setting in config
> > > >>>>>>
> > > >>>>>> execution.checkpointing.trigger:
> > > >>>> BackPressureMonitoringCheckpointTrigger
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval:
> > > >>>>>> 1s
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval:
> > > >>>>>> 30s
> > > >>>>>>
> > > >>>>>> that we could even provide a shortcut to the above construct
> via:
> > > >>>>>>
> > > >>>>>> execution.checkpointing.fast-interval: 1s
> > > >>>>>> execution.checkpointing.slow-interval: 30s
> > > >>>>>>
> > > >>>>>> is harder compared to setting two/three checkpoint intervals,
> one in
> > > >>>> the
> > > >>>>>> config/or via `env.enableCheckpointing(x)`,
> > > >>>>>> secondly passing one/two (fast/slow) values on the source
> itself?
> > > >>>>>>
> > > >>>>>
> > > >>>>> If we can address the use-case by providing just the two
> job-level
> > > >> config
> > > >>>>> as described above, I agree it will indeed be simpler.
> > > >>>>>
> > > >>>>> I have tried to achieve this goal. But the caveat is that it
> requires
> > > >>>> much
> > > >>>>> more work than described above in order to give the configs
> > > >> well-defined
> > > >>>>> semantics. So I find it simpler to just use the approach in
> FLIP-309.
> > > >>>>>
> > > >>>>> Let me explain my concern below. It will be great if you or
> someone
> > > >> else
> > > >>>>> can help provide a solution.
> > > >>>>>
> > > >>>>> 1) We need to clearly document when the fast-interval and
> slow-interval
> > > >>>>> will be used so that users can derive the expected behavior of
> the job
> > > >>>> and
> > > >>>>> be able to config these values.
> > > >>>>>
> > > >>>>> 2) The trigger of fast/slow interval depends on the behavior of
> the
> > > >>>> source
> > > >>>>> (e.g. MySQL CDC, HybridSource). However, no existing concepts of
> source
> > > >>>>> operator (e.g. boundedness) can describe the target behavior. For
> > > >>>> example,
> > > >>>>> MySQL CDC internally has two phases, namely snapshot phase and
> binlog
> > > >>>>> phase, which are not explicitly exposed to its users via source
> > > >> operator
> > > >>>>> API. And we probably should not enumerate all internal phases of
> all
> > > >>>> source
> > > >>>>> operators that are affected by fast/slow interval.
> > > >>>>>
> > > >>>>> 3) An alternative approach might be to define a new concept (e.g.
> > > >>>>> processingBacklog) that is applied to all source operators. Then
> the
> > > >>>>> fast/slow interval's documentation can depend on this concept.
> That
> > > >> means
> > > >>>>> we have to add a top-level concept (similar to source
> boundedness) and
> > > >>>>> require all source operators to specify how they enforce this
> concept
> > > >>>> (e.g.
> > > >>>>> FileSystemSource always emits processingBacklog=true). And there
> might
> > > >> be
> > > >>>>> cases where the source itself (e.g. a bounded Kafka Source) can
> not
> > > >>>>> automatically derive the value of this concept, in which case we
> need
> > > >> to
> > > >>>>> provide option for users to explicitly specify the value for this
> > > >> concept
> > > >>>>> on a per-source basis.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>>> And it probably also has the issues of "having two places to
> > > >>>> configure
> > > >>>>>> checkpointing
> > > >>>>>>> interval" and "giving flexibility for every source to
> implement a
> > > >>>>>> different
> > > >>>>>>> API" (as mentioned below).
> > > >>>>>>
> > > >>>>>> No, it doesn't.
> > > >>>>>>
> > > >>>>>>> IMO, it is a hard-requirement for the user-facing API to be
> > > >>>>>>> clearly defined and users should be able to use the API without
> > > >>>> concern
> > > >>>>>> of
> > > >>>>>>> regression. And this requirement is more important than the
> other
> > > >>>> goals
> > > >>>>>>> discussed above because it is related to the
> stability/performance of
> > > >>>>> the
> > > >>>>>>> production job. What do you think?
> > > >>>>>>
> > > >>>>>> I don't agree with this. There are many things that work
> something in
> > > >>>>>> between perfectly and well enough
> > > >>>>>> in some fraction of use cases (maybe in 99%, maybe 95% or maybe
> 60%),
> > > >>>>> while
> > > >>>>>> still being very useful.
> > > >>>>>> Good examples are: selection of state backend, unaligned
> checkpoints,
> > > >>>>>> buffer debloating but frankly if I go
> > > >>>>>> through list of currently available config options, something
> like
> > > >> half
> > > >>>>> of
> > > >>>>>> them can cause regressions. Heck,
> > > >>>>>> even Flink itself doesn't work perfectly in 100% of the use
> cases, due
> > > >>>>> to a
> > > >>>>>> variety of design choices. Of
> > > >>>>>> course, the more use cases are fine with said feature, the
> better, but
> > > >>>> we
> > > >>>>>> shouldn't fixate to perfectly cover
> > > >>>>>> 100% of the cases, as that's impossible.
> > > >>>>>>
> > > >>>>>> In this particular case, if back pressure monitoring  trigger
> can work
> > > >>>>> well
> > > >>>>>> enough in 95% of cases, I would
> > > >>>>>> say that's already better than the originally proposed
> alternative,
> > > >>>> which
> > > >>>>>> doesn't work at all if user has a large
> > > >>>>>> backlog to reprocess from Kafka, including when using
> HybridSource
> > > >>>> AFTER
> > > >>>>>> the switch to Kafka has
> > > >>>>>> happened. For the remaining 5%, we should try to improve the
> behaviour
> > > >>>>> over
> > > >>>>>> time, but ultimately, users can
> > > >>>>>> decide to just run a fixed checkpoint interval (or at worst use
> the
> > > >>>> hacky
> > > >>>>>> checkpoint trigger that I mentioned
> > > >>>>>> before a couple of times).
> > > >>>>>>
> > > >>>>>> Also to be pedantic, if a user naively selects slow-interval in
> your
> > > >>>>>> proposal to 30 minutes, when that user's
> > > >>>>>> job fails on average every 15-20minutes, his job can end up in
> a state
> > > >>>>> that
> > > >>>>>> it can not make any progress,
> > > >>>>>> this arguably is quite serious regression.
> > > >>>>>>
> > > >>>>>
> > > >>>>> I probably should not say it is "hard requirement". After all
> there are
> > > >>>>> pros/cons. We will need to consider implementation complexity,
> > > >> usability,
> > > >>>>> extensibility etc.
> > > >>>>>
> > > >>>>> I just don't think we should take it for granted to introduce
> > > >> regression
> > > >>>>> for one use-case in order to support another use-case. If we can
> not
> > > >> find
> > > >>>>> an algorithm/solution that addresses
> > > >>>>> both use-case well, I hope we can be open to tackle them
> separately so
> > > >>>> that
> > > >>>>> users can choose the option that best fits their needs.
> > > >>>>>
> > > >>>>> All things else being equal, I think it is preferred for
> user-facing
> > > >> API
> > > >>>> to
> > > >>>>> be clearly defined and let users should be able to use the API
> without
> > > >>>>> concern of regression.
> > > >>>>>
> > > >>>>> Maybe we can list pros/cons for the alternative approaches we
> have been
> > > >>>>> discussing and see choose the best approach. And maybe we will
> end up
> > > >>>>> finding that use-case
> > > >>>>> which needs CheckpointTrigger can be tackled separately from the
> > > >> use-case
> > > >>>>> in FLIP-309.
> > > >>>>>
> > > >>>>>
> > > >>>>>>> I am not sure if there is a typo. Because if
> > > >>>>> backPressuredTimeMsPerSecond
> > > >>>>>> =
> > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure =
> > > >>>> numRecordsInPerSecond /
> > > >>>>>>> 1000 * metricsUpdateInterval according to the above algorithm.
> > > >>>>>>>
> > > >>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure =
> > > >>>>>> (numRecordsInPerSecond
> > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
> > > >>>> metricsUpdateInterval"?
> > > >>>>>>
> > > >>>>>> It looks like there is indeed some mistake in my proposal
> above. Yours
> > > >>>>> look
> > > >>>>>> more correct, it probably
> > > >>>>>> still needs some safeguard/special handling if
> > > >>>>>> `backPressuredTimeMsPerSecond > 950`
> > > >>>>>>
> > > >>>>>>> The only information it can access is the backlog.
> > > >>>>>>
> > > >>>>>> Again no. It can access whatever we want to provide to it.
> > > >>>>>>
> > > >>>>>> Regarding the rest of your concerns. It's a matter of tweaking
> the
> > > >>>>>> parameters and the algorithm itself,
> > > >>>>>> and how much safety-net do we want to have. Ultimately, I'm
> pretty
> > > >> sure
> > > >>>>>> that's a (for 95-99% of cases)
> > > >>>>>> solvable problem. If not, there is always the hacky solution,
> that
> > > >>>> could
> > > >>>>> be
> > > >>>>>> even integrated into this above
> > > >>>>>> mentioned algorithm as a short circuit to always reach
> > > >> `slow-interval`.
> > > >>>>>>
> > > >>>>>> Apart of that, you picked 3 minutes as the checkpointing
> interval in
> > > >>>> your
> > > >>>>>> counter example. In most cases
> > > >>>>>> any interval above 1 minute would inflict pretty negligible
> overheads,
> > > >>>> so
> > > >>>>>> all in all, I would doubt there is
> > > >>>>>> a significant benefit (in most cases) of increasing 3 minute
> > > >> checkpoint
> > > >>>>>> interval to anything more, let alone
> > > >>>>>> 30 minutes.
> > > >>>>>>
> > > >>>>>
> > > >>>>> I am not sure we should design the algorithm with the assumption
> that
> > > >> the
> > > >>>>> short checkpointing interval will always be higher than 1 minute
> etc.
> > > >>>>>
> > > >>>>> I agree the proposed algorithm can solve most cases where the
> resource
> > > >> is
> > > >>>>> sufficient and there is always no backlog in source subtasks. On
> the
> > > >>>> other
> > > >>>>> hand, what makes SRE
> > > >>>>> life hard is probably the remaining 1-5% cases where the traffic
> is
> > > >> spiky
> > > >>>>> and the cluster is reaching its capacity limit. The ability to
> predict
> > > >>>> and
> > > >>>>> control Flink job's behavior (including checkpointing interval)
> can
> > > >>>>> considerably reduce the burden of manging Flink jobs.
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Dong
> > > >>>>>
> > > >>>>>
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Piotrek
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> sob., 3 cze 2023 o 05:44 Dong Lin <lindon...@gmail.com>
> napisał(a):
> > > >>>>>>
> > > >>>>>>> Hi Piotr,
> > > >>>>>>>
> > > >>>>>>> Thanks for the explanations. I have some followup questions
> below.
> > > >>>>>>>
> > > >>>>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski <
> pnowoj...@apache.org
> > > >>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi All,
> > > >>>>>>>>
> > > >>>>>>>> Thanks for chipping in the discussion Ahmed!
> > > >>>>>>>>
> > > >>>>>>>> Regarding using the REST API. Currently I'm leaning towards
> > > >>>>>> implementing
> > > >>>>>>>> this feature inside the Flink itself, via some pluggable
> interface.
> > > >>>>>>>> REST API solution would be tempting, but I guess not everyone
> is
> > > >>>>> using
> > > >>>>>>>> Flink Kubernetes Operator.
> > > >>>>>>>>
> > > >>>>>>>> @Dong
> > > >>>>>>>>
> > > >>>>>>>>> I am not sure metrics such as isBackPressured are already
> sent to
> > > >>>>> JM.
> > > >>>>>>>>
> > > >>>>>>>> Fetching code path on the JM:
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
> > > >>>>>>>>
> > > >>>>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
> > > >>>>>>>>
> > > >>>>>>>> Example code path accessing Task level metrics via JM using
> the
> > > >>>>>>>> `MetricStore`:
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> Thanks for the code reference. I checked the code that invoked
> these
> > > >>>>> two
> > > >>>>>>> classes and found the following information:
> > > >>>>>>>
> > > >>>>>>> - AggregatingSubtasksMetricsHandler#getStoresis currently
> invoked
> > > >>>> only
> > > >>>>>>> when AggregatingJobsMetricsHandler is invoked.
> > > >>>>>>> - AggregatingJobsMetricsHandler is only instantiated and
> returned by
> > > >>>>>>> WebMonitorEndpoint#initializeHandlers
> > > >>>>>>> - WebMonitorEndpoint#initializeHandlers is only used by
> > > >>>>>> RestServerEndpoint.
> > > >>>>>>> And RestServerEndpoint invokes these handlers in response to
> external
> > > >>>>>> REST
> > > >>>>>>> request.
> > > >>>>>>>
> > > >>>>>>> I understand that JM will get the backpressure-related metrics
> every
> > > >>>>> time
> > > >>>>>>> the RestServerEndpoint receives the REST request to get these
> > > >>>> metrics.
> > > >>>>>> But
> > > >>>>>>> I am not sure if RestServerEndpoint is already always
> receiving the
> > > >>>>> REST
> > > >>>>>>> metrics at regular interval (suppose there is no human manually
> > > >>>>>>> opening/clicking the Flink Web UI). And if it does, what is the
> > > >>>>> interval?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>>> For example, let's say every source operator subtask reports
> this
> > > >>>>>>> metric
> > > >>>>>>>> to
> > > >>>>>>>>> JM once every 10 seconds. There are 100 source subtasks. And
> each
> > > >>>>>>> subtask
> > > >>>>>>>>> is backpressured roughly 10% of the total time due to traffic
> > > >>>>> spikes
> > > >>>>>>> (and
> > > >>>>>>>>> limited buffer). Then at any given time, there are 1 -
> 0.9^100 =
> > > >>>>>>> 99.997%
> > > >>>>>>>>> chance that there is at least one subtask that is
> backpressured.
> > > >>>>> Then
> > > >>>>>>> we
> > > >>>>>>>>> have to wait for at least 10 seconds to check again.
> > > >>>>>>>>
> > > >>>>>>>> backPressuredTimeMsPerSecond and other related metrics (like
> > > >>>>>>>> busyTimeMsPerSecond) are not subject to that problem.
> > > >>>>>>>> They are recalculated once every metric fetching interval,
> and they
> > > >>>>>>> report
> > > >>>>>>>> accurately on average the given subtask spent
> > > >>>>>> busy/idling/backpressured.
> > > >>>>>>>> In your example, backPressuredTimeMsPerSecond would report
> 100ms/s.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Suppose every subtask is already reporting
> > > >>>> backPressuredTimeMsPerSecond
> > > >>>>>> to
> > > >>>>>>> JM once every 100 ms. If a job has 10 operators (that are not
> > > >>>> chained)
> > > >>>>>> and
> > > >>>>>>> each operator has 100 subtasks, then JM would need to handle
> 10000
> > > >>>>>> requests
> > > >>>>>>> per second to receive metrics from these 1000 subtasks. It
> seems
> > > >>>> like a
> > > >>>>>>> non-trivial overhead for medium-to-large sized jobs and can
> make JM
> > > >>>> the
> > > >>>>>>> performance bottleneck during job execution.
> > > >>>>>>>
> > > >>>>>>> I would be surprised if Flink is already paying this much
> overhead
> > > >>>> just
> > > >>>>>> for
> > > >>>>>>> metrics monitoring. That is the main reason I still doubt it
> is true.
> > > >>>>> Can
> > > >>>>>>> you show where this 100 ms is currently configured?
> > > >>>>>>>
> > > >>>>>>> Alternatively, maybe you mean that we should add extra code to
> invoke
> > > >>>>> the
> > > >>>>>>> REST API at 100 ms interval. Then that means we need to
> considerably
> > > >>>>>>> increase the network/cpu overhead at JM, where the overhead
> will
> > > >>>>> increase
> > > >>>>>>> as the number of TM/slots increase, which may pose risk to the
> > > >>>>>> scalability
> > > >>>>>>> of the proposed design. I am not sure we should do this. What
> do you
> > > >>>>>> think?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> While it will be nice to support additional use-cases
> > > >>>>>>>>> with one proposal, it is probably also reasonable to make
> > > >>>>> incremental
> > > >>>>>>>>> progress and support the low-hanging-fruit use-case first.
> The
> > > >>>>> choice
> > > >>>>>>>>> really depends on the complexity and the importance of
> supporting
> > > >>>>> the
> > > >>>>>>>> extra
> > > >>>>>>>>> use-cases.
> > > >>>>>>>>
> > > >>>>>>>> That would be true, if that was a private implementation
> detail or
> > > >>>> if
> > > >>>>>> the
> > > >>>>>>>> low-hanging-fruit-solution would be on the direct path to the
> final
> > > >>>>>>>> solution.
> > > >>>>>>>> That's unfortunately not the case here. This will add public
> facing
> > > >>>>>> API,
> > > >>>>>>>> that we will later need to maintain, no matter what the final
> > > >>>>> solution
> > > >>>>>>> will
> > > >>>>>>>> be,
> > > >>>>>>>> and at the moment at least I don't see it being related to a
> > > >>>>> "perfect"
> > > >>>>>>>> solution.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Sure. Then let's decide the final solution first.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>>> I guess the point is that the suggested approach, which
> > > >>>> dynamically
> > > >>>>>>>>> determines the checkpointing interval based on the
> backpressure,
> > > >>>>> may
> > > >>>>>>>> cause
> > > >>>>>>>>> regression when the checkpointing interval is relatively low.
> > > >>>> This
> > > >>>>>>> makes
> > > >>>>>>>> it
> > > >>>>>>>>> hard for users to enable this feature in production. It is
> like
> > > >>>> an
> > > >>>>>>>>> auto-driving system that is not guaranteed to work
> > > >>>>>>>>
> > > >>>>>>>> Yes, creating a more generic solution that would require less
> > > >>>>>>> configuration
> > > >>>>>>>> is usually more difficult then static configurations.
> > > >>>>>>>> It doesn't mean we shouldn't try to do that. Especially that
> if my
> > > >>>>>>> proposed
> > > >>>>>>>> algorithm wouldn't work good enough, there is
> > > >>>>>>>> an obvious solution, that any source could add a metric, like
> let
> > > >>>> say
> > > >>>>>>>> "processingBacklog: true/false", and the `CheckpointTrigger`
> > > >>>>>>>> could use this as an override to always switch to the
> > > >>>>>>>> "slowCheckpointInterval". I don't think we need it, but that's
> > > >>>> always
> > > >>>>>> an
> > > >>>>>>>> option
> > > >>>>>>>> that would be basically equivalent to your original proposal.
> Or
> > > >>>> even
> > > >>>>>>>> source could add "suggestedCheckpointInterval : int", and
> > > >>>>>>>> `CheckpointTrigger` could use that value if present as a hint
> in
> > > >>>> one
> > > >>>>>> way
> > > >>>>>>> or
> > > >>>>>>>> another.
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> So far we have talked about the possibility of using
> > > >>>> CheckpointTrigger
> > > >>>>>> and
> > > >>>>>>> mentioned the CheckpointTrigger
> > > >>>>>>> and read metric values.
> > > >>>>>>>
> > > >>>>>>> Can you help answer the following questions so that I can
> understand
> > > >>>>> the
> > > >>>>>>> alternative solution more concretely:
> > > >>>>>>>
> > > >>>>>>> - What is the interface of this CheckpointTrigger? For
> example, are
> > > >>>> we
> > > >>>>>>> going to give CheckpointTrigger a context that it can use to
> fetch
> > > >>>>>>> arbitrary metric values? This can help us understand what
> information
> > > >>>>>> this
> > > >>>>>>> user-defined CheckpointTrigger can use to make the checkpoint
> > > >>>> decision.
> > > >>>>>>> - Where is this CheckpointTrigger running? For example, is it
> going
> > > >>>> to
> > > >>>>>> run
> > > >>>>>>> on the subtask of every source operator? Or is it going to run
> on the
> > > >>>>> JM?
> > > >>>>>>> - Are we going to provide a default implementation of this
> > > >>>>>>> CheckpointTrigger in Flink that implements the algorithm
> described
> > > >>>>> below,
> > > >>>>>>> or do we expect each source operator developer to implement
> their own
> > > >>>>>>> CheckpointTrigger?
> > > >>>>>>> - How can users specify the
> > > >>>>>> fastCheckpointInterval/slowCheckpointInterval?
> > > >>>>>>> For example, will we provide APIs on the CheckpointTrigger that
> > > >>>>> end-users
> > > >>>>>>> can use to specify the checkpointing interval? What would that
> look
> > > >>>>> like?
> > > >>>>>>>
> > > >>>>>>> Overall, my gut feel is that the alternative approach based on
> > > >>>>>>> CheckpointTrigger is more complicated and harder to use. And it
> > > >>>>> probably
> > > >>>>>>> also has the issues of "having two places to configure
> checkpointing
> > > >>>>>>> interval" and "giving flexibility for every source to
> implement a
> > > >>>>>> different
> > > >>>>>>> API" (as mentioned below).
> > > >>>>>>>
> > > >>>>>>> Maybe we can evaluate it more after knowing the answers to the
> above
> > > >>>>>>> questions.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> On the other hand, the approach currently proposed in the
> FLIP is
> > > >>>>>> much
> > > >>>>>>>>> simpler as it does not depend on backpressure. Users specify
> the
> > > >>>>>> extra
> > > >>>>>>>>> interval requirement on the specific sources (e.g.
> HybridSource,
> > > >>>>>> MySQL
> > > >>>>>>>> CDC
> > > >>>>>>>>> Source) and can easily know the checkpointing interval will
> be
> > > >>>> used
> > > >>>>>> on
> > > >>>>>>>> the
> > > >>>>>>>>> continuous phase of the corresponding source. This is pretty
> much
> > > >>>>>> same
> > > >>>>>>> as
> > > >>>>>>>>> how users use the existing execution.checkpointing.interval
> > > >>>> config.
> > > >>>>>> So
> > > >>>>>>>>> there is no extra concern of regression caused by this
> approach.
> > > >>>>>>>>
> > > >>>>>>>> To an extent, but as I have already previously mentioned I
> really
> > > >>>>>> really
> > > >>>>>>> do
> > > >>>>>>>> not like idea of:
> > > >>>>>>>> - having two places to configure checkpointing interval
> (config
> > > >>>>> file
> > > >>>>>>> and
> > > >>>>>>>> in the Source builders)
> > > >>>>>>>> - giving flexibility for every source to implement a different
> > > >>>> API
> > > >>>>>> for
> > > >>>>>>>> that purpose
> > > >>>>>>>> - creating a solution that is not generic enough, so that we
> will
> > > >>>>>> need
> > > >>>>>>> a
> > > >>>>>>>> completely different mechanism in the future anyway
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> Yeah, I understand different developers might have different
> > > >>>>>>> concerns/tastes for these APIs. Ultimately, there might not be
> a
> > > >>>>> perfect
> > > >>>>>>> solution and we have to choose based on the pros/cons of these
> > > >>>>> solutions.
> > > >>>>>>>
> > > >>>>>>> I agree with you that, all things being equal, it is
> preferable to 1)
> > > >>>>>> have
> > > >>>>>>> one place to configure checkpointing intervals, 2) have all
> source
> > > >>>>>>> operators use the same API, and 3) create a solution that is
> generic
> > > >>>>> and
> > > >>>>>>> last lasting. Note that these three goals affects the
> usability and
> > > >>>>>>> extensibility of the API, but not necessarily the
> > > >>>> stability/performance
> > > >>>>>> of
> > > >>>>>>> the production job.
> > > >>>>>>>
> > > >>>>>>> BTW, there are also other preferrable goals. For example, it
> is very
> > > >>>>>> useful
> > > >>>>>>> for the job's behavior to be predictable and interpretable so
> that
> > > >>>> SRE
> > > >>>>>> can
> > > >>>>>>> operator/debug the Flink in an easier way. We can list these
> > > >>>> pros/cons
> > > >>>>>>> altogether later.
> > > >>>>>>>
> > > >>>>>>> I am wondering if we can first agree on the priority of goals
> we want
> > > >>>>> to
> > > >>>>>>> achieve. IMO, it is a hard-requirement for the user-facing API
> to be
> > > >>>>>>> clearly defined and users should be able to use the API without
> > > >>>> concern
> > > >>>>>> of
> > > >>>>>>> regression. And this requirement is more important than the
> other
> > > >>>> goals
> > > >>>>>>> discussed above because it is related to the
> stability/performance of
> > > >>>>> the
> > > >>>>>>> production job. What do you think?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> Sounds good. Looking forward to learning more ideas.
> > > >>>>>>>>
> > > >>>>>>>> I have thought about this a bit more, and I think we don't
> need to
> > > >>>>>> check
> > > >>>>>>>> for the backpressure status, or how much overloaded all of the
> > > >>>>>> operators
> > > >>>>>>>> are.
> > > >>>>>>>> We could just check three things for source operators:
> > > >>>>>>>> 1. pendingRecords (backlog length)
> > > >>>>>>>> 2. numRecordsInPerSecond
> > > >>>>>>>> 3. backPressuredTimeMsPerSecond
> > > >>>>>>>>
> > > >>>>>>>> // int metricsUpdateInterval = 10s // obtained from config
> > > >>>>>>>> // Next line calculates how many records can we consume from
> the
> > > >>>>>> backlog,
> > > >>>>>>>> assuming
> > > >>>>>>>> // that magically the reason behind a backpressure vanishes.
> We
> > > >>>> will
> > > >>>>>> use
> > > >>>>>>>> this only as
> > > >>>>>>>> // a safeguard  against scenarios like for example if
> backpressure
> > > >>>>> was
> > > >>>>>>>> caused by some
> > > >>>>>>>> // intermittent failure/performance degradation.
> > > >>>>>>>> maxRecordsConsumedWithoutBackpressure =
> (numRecordsInPerSecond /
> > > >>>>> (1000
> > > >>>>>>>> - backPressuredTimeMsPerSecond / 1000)) *
> metricsUpdateInterval
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> I am not sure if there is a typo. Because if
> > > >>>>>> backPressuredTimeMsPerSecond =
> > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure =
> > > >>>> numRecordsInPerSecond /
> > > >>>>>>> 1000 * metricsUpdateInterval according to the above algorithm.
> > > >>>>>>>
> > > >>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure =
> > > >>>>>> (numRecordsInPerSecond
> > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
> > > >>>> metricsUpdateInterval"?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> // we are excluding maxRecordsConsumedWithoutBackpressure
> from the
> > > >>>>>>> backlog
> > > >>>>>>>> as
> > > >>>>>>>> // a safeguard against an intermittent back pressure
> problems, so
> > > >>>>> that
> > > >>>>>> we
> > > >>>>>>>> don't
> > > >>>>>>>> // calculate next checkpoint interval far far in the future,
> while
> > > >>>>> the
> > > >>>>>>>> backpressure
> > > >>>>>>>> // goes away before we will recalculate metrics and new
> > > >>>> checkpointing
> > > >>>>>>>> interval
> > > >>>>>>>> timeToConsumeBacklog = (pendingRecords -
> > > >>>>>>>> maxRecordsConsumedWithoutBackpressure) / numRecordsInPerSecond
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Then we can use those numbers to calculate desired
> checkpointed
> > > >>>>>> interval
> > > >>>>>>>> for example like this:
> > > >>>>>>>>
> > > >>>>>>>> long calculatedCheckpointInterval = timeToConsumeBacklog / 10;
> > > >>>> //this
> > > >>>>>> may
> > > >>>>>>>> need some refining
> > > >>>>>>>> long nextCheckpointInterval = min(max(fastCheckpointInterval,
> > > >>>>>>>> calculatedCheckpointInterval), slowCheckpointInterval);
> > > >>>>>>>> long nextCheckpointTs = lastCheckpointTs +
> nextCheckpointInterval;
> > > >>>>>>>>
> > > >>>>>>>> WDYT?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> I think the idea of the above algorithm is to incline to use
> the
> > > >>>>>>> fastCheckpointInterval unless we are very sure the backlog
> will take
> > > >>>> a
> > > >>>>>> long
> > > >>>>>>> time to process. This can alleviate the concern of regression
> during
> > > >>>>> the
> > > >>>>>>> continuous_bounded phase since we are more likely to use the
> > > >>>>>>> fastCheckpointInterval. However, it can cause regression
> during the
> > > >>>>>> bounded
> > > >>>>>>> phase.
> > > >>>>>>>
> > > >>>>>>> I will use a concrete example to explain the risk of
> regression:
> > > >>>>>>> - The user is using HybridSource to read from HDFS followed by
> Kafka.
> > > >>>>> The
> > > >>>>>>> data in HDFS is old and there is no need for data freshness
> for the
> > > >>>>> data
> > > >>>>>> in
> > > >>>>>>> HDFS.
> > > >>>>>>> - The user configures the job as below:
> > > >>>>>>> - fastCheckpointInterval = 3 minutes
> > > >>>>>>> - slowCheckpointInterval = 30 minutes
> > > >>>>>>> - metricsUpdateInterval = 100 ms
> > > >>>>>>>
> > > >>>>>>> Using the above formulate, we can know that once pendingRecords
> > > >>>>>>> <= numRecordsInPerSecond * 30-minutes, then
> > > >>>>> calculatedCheckpointInterval
> > > >>>>>> <=
> > > >>>>>>> 3 minutes, meaning that we will use slowCheckpointInterval as
> the
> > > >>>>>>> checkpointing interval. Then in the last 30 minutes of the
> bounded
> > > >>>>> phase,
> > > >>>>>>> the checkpointing frequency will be 10X higher than what the
> user
> > > >>>>> wants.
> > > >>>>>>>
> > > >>>>>>> Also note that the same issue would also considerably limit the
> > > >>>>> benefits
> > > >>>>>> of
> > > >>>>>>> the algorithm. For example, during the continuous phase, the
> > > >>>> algorithm
> > > >>>>>> will
> > > >>>>>>> only be better than the approach in FLIP-309 when there is at
> least
> > > >>>>>>> 30-minutes worth of backlog in the source.
> > > >>>>>>>
> > > >>>>>>> Sure, having a slower checkpointing interval in this extreme
> case
> > > >>>>> (where
> > > >>>>>>> there is 30-minutes backlog in the continous-unbounded phase)
> is
> > > >>>> still
> > > >>>>>>> useful when this happens. But since this is the un-common
> case, and
> > > >>>> the
> > > >>>>>>> right solution is probably to do capacity planning to avoid
> this from
> > > >>>>>>> happening in the first place, I am not sure it is worth
> optimizing
> > > >>>> for
> > > >>>>>> this
> > > >>>>>>> case at the cost of regression in the bounded phase and the
> reduced
> > > >>>>>>> operational predictability for users (e.g. what checkpointing
> > > >>>> interval
> > > >>>>>>> should I expect at this stage of the job).
> > > >>>>>>>
> > > >>>>>>> I think the fundamental issue with this algorithm is that it is
> > > >>>> applied
> > > >>>>>> to
> > > >>>>>>> both the bounded phases and the continous_unbounded phases
> without
> > > >>>>>> knowing
> > > >>>>>>> which phase the job is running at. The only information it can
> access
> > > >>>>> is
> > > >>>>>>> the backlog. But two sources with the same amount of backlog
> do not
> > > >>>>>>> necessarily mean they have the same data freshness requirement.
> > > >>>>>>>
> > > >>>>>>> In this particular example, users know that the data in HDFS
> is very
> > > >>>>> old
> > > >>>>>>> and there is no need for data freshness. Users can express
> signals
> > > >>>> via
> > > >>>>>> the
> > > >>>>>>> per-source API proposed in the FLIP. This is why the current
> approach
> > > >>>>> in
> > > >>>>>>> FLIP-309 can be better in this case.
> > > >>>>>>>
> > > >>>>>>> What do you think?
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Dong
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Best,
> > > >>>>>>>> Piotrek
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > > >>
> > >
> > >
> >
>
>

Reply via email to