Hi Francesco!

I think this would be a reasonable and nice improvement!

Cheers,
Gyula

On Sat, Mar 8, 2025 at 6:17 AM Francesco D <francescodch...@gmail.com>
wrote:

> Hi all,
>
> I’d like to discuss an issue encountered while using Flink OSS autoscaler
> with jobs that have high parallelism (1k+ subtasks). I’ll share the
> solution we've implemented internally and would like to hear if others in
> the community have faced similar challenges. Additionally, I’m curious to
> know if our solution could be a useful contribution to Flink OSS.
>
> The Issue:
> While analyzing autoscaling decisions for high-parallelism jobs, we noticed
> frequent connection timeouts when the autoscaler tried to fetch
> vertex-level metrics, even after increasing the connection timeout to one
> minute. Through flamegraph analysis, we identified that the root cause lies
> in the current metrics refresh process in the JM, which is triggered almost
> every time any /metrics endpoint is called. This process is synchronous,
> waiting for metrics from all TMs before aggregation.
>
> While this approach is simple, it's inefficient for clients like the
> autoscaler or the Web UI, which typically require only a small subset of
> metrics (fewer than 20). For larger jobs, the full refresh can involve
> thousands of metrics, introducing significant latency—sometimes over a
> minute—and leading to connection timeouts. These delays disrupt the
> autoscaler's ability to make timely scaling decisions.
>
> Our Solution:
> To address this, we added a new method to the MetricFetcher interface to
> allow clients to fetch only the metrics they need, instead of fetching all
> metrics at once:
>
> /** Fetch metrics based on the provided set of metric names. */
> void update(Set<String> names);
>
> This change significantly reduces the data transferred, speeds up
> aggregation, and minimizes the risk of timeouts. We also kept the original
> update() method for backward compatibility when filtering by name isn't
> feasible.
>
> Additionally, we introduced the following safeguards to protect JM/TM
> against oom issues:
>
> 1. A new fetch won’t trigger if a full fetch is already in progress.
> 2. A minimum interval is enforced between fetches, preventing overloading
> the system.
>
> Results:
> Since implementing this solution, we've been able to scale jobs with more
> than 2,000 subtasks, enabling scaling decisions (and metric retrieval)
> every 10 seconds—an achievement that was previously hindered by timeouts
> and latency.
>
> Questions:
> Has anyone else encountered similar challenges with high-parallelism jobs
> and the current metrics fetching process? If so, how have you solved it?
>
> Additionally, we’d love to hear your thoughts on whether this solution
> could benefit the wider Flink OSS community. Would this approach be
> something worth formally proposing to Flink OSS? Any guidance on how to
> proceed would be greatly appreciated.
>
> Looking forward to hearing from you all!
>
> Best,
>
> Francesco
>

Reply via email to