Hi all,

I’d like to discuss an issue encountered while using Flink OSS autoscaler
with jobs that have high parallelism (1k+ subtasks). I’ll share the
solution we've implemented internally and would like to hear if others in
the community have faced similar challenges. Additionally, I’m curious to
know if our solution could be a useful contribution to Flink OSS.

The Issue:
While analyzing autoscaling decisions for high-parallelism jobs, we noticed
frequent connection timeouts when the autoscaler tried to fetch
vertex-level metrics, even after increasing the connection timeout to one
minute. Through flamegraph analysis, we identified that the root cause lies
in the current metrics refresh process in the JM, which is triggered almost
every time any /metrics endpoint is called. This process is synchronous,
waiting for metrics from all TMs before aggregation.

While this approach is simple, it's inefficient for clients like the
autoscaler or the Web UI, which typically require only a small subset of
metrics (fewer than 20). For larger jobs, the full refresh can involve
thousands of metrics, introducing significant latency—sometimes over a
minute—and leading to connection timeouts. These delays disrupt the
autoscaler's ability to make timely scaling decisions.

Our Solution:
To address this, we added a new method to the MetricFetcher interface to
allow clients to fetch only the metrics they need, instead of fetching all
metrics at once:

/** Fetch metrics based on the provided set of metric names. */
void update(Set<String> names);

This change significantly reduces the data transferred, speeds up
aggregation, and minimizes the risk of timeouts. We also kept the original
update() method for backward compatibility when filtering by name isn't
feasible.

Additionally, we introduced the following safeguards to protect JM/TM
against oom issues:

1. A new fetch won’t trigger if a full fetch is already in progress.
2. A minimum interval is enforced between fetches, preventing overloading
the system.

Results:
Since implementing this solution, we've been able to scale jobs with more
than 2,000 subtasks, enabling scaling decisions (and metric retrieval)
every 10 seconds—an achievement that was previously hindered by timeouts
and latency.

Questions:
Has anyone else encountered similar challenges with high-parallelism jobs
and the current metrics fetching process? If so, how have you solved it?

Additionally, we’d love to hear your thoughts on whether this solution
could benefit the wider Flink OSS community. Would this approach be
something worth formally proposing to Flink OSS? Any guidance on how to
proceed would be greatly appreciated.

Looking forward to hearing from you all!

Best,

Francesco

Reply via email to