Hi Piotr,

Thanks for the explanations. I have some followup questions below.

On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi All,
>
> Thanks for chipping in the discussion Ahmed!
>
> Regarding using the REST API. Currently I'm leaning towards implementing
> this feature inside the Flink itself, via some pluggable interface.
> REST API solution would be tempting, but I guess not everyone is using
> Flink Kubernetes Operator.
>
> @Dong
>
> > I am not sure metrics such as isBackPressured are already sent to JM.
>
> Fetching code path on the JM:
>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
>
> Example code path accessing Task level metrics via JM using the
> `MetricStore`:
>
> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
>

Thanks for the code reference. I checked the code that invoked these two
classes and found the following information:

- AggregatingSubtasksMetricsHandler#getStoresis currently invoked only
when AggregatingJobsMetricsHandler is invoked.
- AggregatingJobsMetricsHandler is only instantiated and returned by
WebMonitorEndpoint#initializeHandlers
- WebMonitorEndpoint#initializeHandlers is only used by RestServerEndpoint.
And RestServerEndpoint invokes these handlers in response to external REST
request.

I understand that JM will get the backpressure-related metrics every time
the RestServerEndpoint receives the REST request to get these metrics. But
I am not sure if RestServerEndpoint is already always receiving the REST
metrics at regular interval (suppose there is no human manually
opening/clicking the Flink Web UI). And if it does, what is the interval?



> > For example, let's say every source operator subtask reports this metric
> to
> > JM once every 10 seconds. There are 100 source subtasks. And each subtask
> > is backpressured roughly 10% of the total time due to traffic spikes (and
> > limited buffer). Then at any given time, there are 1 - 0.9^100 = 99.997%
> > chance that there is at least one subtask that is backpressured. Then we
> > have to wait for at least 10 seconds to check again.
>
> backPressuredTimeMsPerSecond and other related metrics (like
> busyTimeMsPerSecond) are not subject to that problem.
> They are recalculated once every metric fetching interval, and they report
> accurately on average the given subtask spent busy/idling/backpressured.
> In your example, backPressuredTimeMsPerSecond would report 100ms/s.


Suppose every subtask is already reporting backPressuredTimeMsPerSecond to
JM once every 100 ms. If a job has 10 operators (that are not chained) and
each operator has 100 subtasks, then JM would need to handle 10000 requests
per second to receive metrics from these 1000 subtasks. It seems like a
non-trivial overhead for medium-to-large sized jobs and can make JM the
performance bottleneck during job execution.

I would be surprised if Flink is already paying this much overhead just for
metrics monitoring. That is the main reason I still doubt it is true. Can
you show where this 100 ms is currently configured?

Alternatively, maybe you mean that we should add extra code to invoke the
REST API at 100 ms interval. Then that means we need to considerably
increase the network/cpu overhead at JM, where the overhead will increase
as the number of TM/slots increase, which may pose risk to the scalability
of the proposed design. I am not sure we should do this. What do you think?


>
> > While it will be nice to support additional use-cases
> > with one proposal, it is probably also reasonable to make incremental
> > progress and support the low-hanging-fruit use-case first. The choice
> > really depends on the complexity and the importance of supporting the
> extra
> > use-cases.
>
> That would be true, if that was a private implementation detail or if the
> low-hanging-fruit-solution would be on the direct path to the final
> solution.
> That's unfortunately not the case here. This will add public facing API,
> that we will later need to maintain, no matter what the final solution will
> be,
> and at the moment at least I don't see it being related to a "perfect"
> solution.


Sure. Then let's decide the final solution first.


> > I guess the point is that the suggested approach, which dynamically
> > determines the checkpointing interval based on the backpressure, may
> cause
> > regression when the checkpointing interval is relatively low. This makes
> it
> > hard for users to enable this feature in production. It is like an
> > auto-driving system that is not guaranteed to work
>
> Yes, creating a more generic solution that would require less configuration
> is usually more difficult then static configurations.
> It doesn't mean we shouldn't try to do that. Especially that if my proposed
> algorithm wouldn't work good enough, there is
> an obvious solution, that any source could add a metric, like let say
> "processingBacklog: true/false", and the `CheckpointTrigger`
> could use this as an override to always switch to the
> "slowCheckpointInterval". I don't think we need it, but that's always an
> option
> that would be basically equivalent to your original proposal. Or even
> source could add "suggestedCheckpointInterval : int", and
> `CheckpointTrigger` could use that value if present as a hint in one way or
> another.
>

So far we have talked about the possibility of using CheckpointTrigger and
mentioned the CheckpointTrigger
and read metric values.

Can you help answer the following questions so that I can understand the
alternative solution more concretely:

- What is the interface of this CheckpointTrigger? For example, are we
going to give CheckpointTrigger a context that it can use to fetch
arbitrary metric values? This can help us understand what information this
user-defined CheckpointTrigger can use to make the checkpoint decision.
- Where is this CheckpointTrigger running? For example, is it going to run
on the subtask of every source operator? Or is it going to run on the JM?
- Are we going to provide a default implementation of this
CheckpointTrigger in Flink that implements the algorithm described below,
or do we expect each source operator developer to implement their own
CheckpointTrigger?
- How can users specify the fastCheckpointInterval/slowCheckpointInterval?
For example, will we provide APIs on the CheckpointTrigger that end-users
can use to specify the checkpointing interval? What would that look like?

Overall, my gut feel is that the alternative approach based on
CheckpointTrigger is more complicated and harder to use. And it probably
also has the issues of "having two places to configure checkpointing
interval" and "giving flexibility for every source to implement a different
API" (as mentioned below).

Maybe we can evaluate it more after knowing the answers to the above
questions.



>
> > On the other hand, the approach currently proposed in the FLIP is much
> > simpler as it does not depend on backpressure. Users specify the extra
> > interval requirement on the specific sources (e.g. HybridSource, MySQL
> CDC
> > Source) and can easily know the checkpointing interval will be used on
> the
> > continuous phase of the corresponding source. This is pretty much same as
> > how users use the existing execution.checkpointing.interval config. So
> > there is no extra concern of regression caused by this approach.
>
> To an extent, but as I have already previously mentioned I really really do
> not like idea of:
>   - having two places to configure checkpointing interval (config file and
> in the Source builders)
>   - giving flexibility for every source to implement a different API for
> that purpose
>   - creating a solution that is not generic enough, so that we will need a
> completely different mechanism in the future anyway
>

Yeah, I understand different developers might have different
concerns/tastes for these APIs. Ultimately, there might not be a perfect
solution and we have to choose based on the pros/cons of these solutions.

I agree with you that, all things being equal, it is preferable to 1) have
one place to configure checkpointing intervals, 2) have all source
operators use the same API, and 3) create a solution that is generic and
last lasting. Note that these three goals affects the usability and
extensibility of the API, but not necessarily the stability/performance of
the production job.

BTW, there are also other preferrable goals. For example, it is very useful
for the job's behavior to be predictable and interpretable so that SRE can
operator/debug the Flink in an easier way. We can list these pros/cons
altogether later.

I am wondering if we can first agree on the priority of goals we want to
achieve. IMO, it is a hard-requirement for the user-facing API to be
clearly defined and users should be able to use the API without concern of
regression. And this requirement is more important than the other goals
discussed above because it is related to the stability/performance of the
production job. What do you think?


>
> > Sounds good. Looking forward to learning more ideas.
>
> I have thought about this a bit more, and I think we don't need to check
> for the backpressure status, or how much overloaded all of the operators
> are.
> We could just check three things for source operators:
> 1. pendingRecords (backlog length)
> 2. numRecordsInPerSecond
> 3. backPressuredTimeMsPerSecond
>
> // int metricsUpdateInterval = 10s // obtained from config
> // Next line calculates how many records can we consume from the backlog,
> assuming
> // that magically the reason behind a backpressure vanishes. We will use
> this only as
> // a safeguard  against scenarios like for example if backpressure was
> caused by some
> // intermittent failure/performance degradation.
> maxRecordsConsumedWithoutBackpressure = (numRecordsInPerSecond / (1000
> - backPressuredTimeMsPerSecond / 1000)) * metricsUpdateInterval
>

I am not sure if there is a typo. Because if backPressuredTimeMsPerSecond =
0, then maxRecordsConsumedWithoutBackpressure = numRecordsInPerSecond /
1000 * metricsUpdateInterval according to the above algorithm.

Do you mean "maxRecordsConsumedWithoutBackpressure = (numRecordsInPerSecond
/ (1 - backPressuredTimeMsPerSecond / 1000)) * metricsUpdateInterval"?


>
> // we are excluding maxRecordsConsumedWithoutBackpressure from the backlog
> as
> // a safeguard against an intermittent back pressure problems, so that we
> don't
> // calculate next checkpoint interval far far in the future, while the
> backpressure
> // goes away before we will recalculate metrics and new checkpointing
> interval
> timeToConsumeBacklog = (pendingRecords -
> maxRecordsConsumedWithoutBackpressure) / numRecordsInPerSecond
>
>
> Then we can use those numbers to calculate desired checkpointed interval
> for example like this:
>
> long calculatedCheckpointInterval = timeToConsumeBacklog / 10; //this may
> need some refining
> long nextCheckpointInterval = min(max(fastCheckpointInterval,
> calculatedCheckpointInterval), slowCheckpointInterval);
> long nextCheckpointTs = lastCheckpointTs + nextCheckpointInterval;
>
> WDYT?


I think the idea of the above algorithm is to incline to use the
fastCheckpointInterval unless we are very sure the backlog will take a long
time to process. This can alleviate the concern of regression during the
continuous_bounded phase since we are more likely to use the
fastCheckpointInterval. However, it can cause regression during the bounded
phase.

I will use a concrete example to explain the risk of regression:
- The user is using HybridSource to read from HDFS followed by Kafka. The
data in HDFS is old and there is no need for data freshness for the data in
HDFS.
- The user configures the job as below:
  - fastCheckpointInterval = 3 minutes
  - slowCheckpointInterval = 30 minutes
  - metricsUpdateInterval = 100 ms

Using the above formulate, we can know that once pendingRecords
<= numRecordsInPerSecond * 30-minutes, then calculatedCheckpointInterval <=
3 minutes, meaning that we will use slowCheckpointInterval as the
checkpointing interval. Then in the last 30 minutes of the bounded phase,
the checkpointing frequency will be 10X higher than what the user wants.

Also note that the same issue would also considerably limit the benefits of
the algorithm. For example, during the continuous phase, the algorithm will
only be better than the approach in FLIP-309 when there is at least
30-minutes worth of backlog in the source.

Sure, having a slower checkpointing interval in this extreme case (where
there is 30-minutes backlog in the continous-unbounded phase) is still
useful when this happens. But since this is the un-common case, and the
right solution is probably to do capacity planning to avoid this from
happening in the first place, I am not sure it is worth optimizing for this
case at the cost of regression in the bounded phase and the reduced
operational predictability for users (e.g. what checkpointing interval
should I expect at this stage of the job).

I think the fundamental issue with this algorithm is that it is applied to
both the bounded phases and the continous_unbounded phases without knowing
which phase the job is running at. The only information it can access is
the backlog. But two sources with the same amount of backlog do not
necessarily mean they have the same data freshness requirement.

In this particular example, users know that the data in HDFS is very old
and there is no need for data freshness. Users can express signals via the
per-source API proposed in the FLIP. This is why the current approach in
FLIP-309 can be better in this case.

What do you think?

Best,
Dong


>
> Best,
> Piotrek
>
>

Reply via email to