Hi Francesco! I think this would be a reasonable and nice improvement!
Cheers, Gyula On Sat, Mar 8, 2025 at 6:17 AM Francesco D <francescodch...@gmail.com> wrote: > Hi all, > > I’d like to discuss an issue encountered while using Flink OSS autoscaler > with jobs that have high parallelism (1k+ subtasks). I’ll share the > solution we've implemented internally and would like to hear if others in > the community have faced similar challenges. Additionally, I’m curious to > know if our solution could be a useful contribution to Flink OSS. > > The Issue: > While analyzing autoscaling decisions for high-parallelism jobs, we noticed > frequent connection timeouts when the autoscaler tried to fetch > vertex-level metrics, even after increasing the connection timeout to one > minute. Through flamegraph analysis, we identified that the root cause lies > in the current metrics refresh process in the JM, which is triggered almost > every time any /metrics endpoint is called. This process is synchronous, > waiting for metrics from all TMs before aggregation. > > While this approach is simple, it's inefficient for clients like the > autoscaler or the Web UI, which typically require only a small subset of > metrics (fewer than 20). For larger jobs, the full refresh can involve > thousands of metrics, introducing significant latency—sometimes over a > minute—and leading to connection timeouts. These delays disrupt the > autoscaler's ability to make timely scaling decisions. > > Our Solution: > To address this, we added a new method to the MetricFetcher interface to > allow clients to fetch only the metrics they need, instead of fetching all > metrics at once: > > /** Fetch metrics based on the provided set of metric names. */ > void update(Set<String> names); > > This change significantly reduces the data transferred, speeds up > aggregation, and minimizes the risk of timeouts. We also kept the original > update() method for backward compatibility when filtering by name isn't > feasible. > > Additionally, we introduced the following safeguards to protect JM/TM > against oom issues: > > 1. A new fetch won’t trigger if a full fetch is already in progress. > 2. A minimum interval is enforced between fetches, preventing overloading > the system. > > Results: > Since implementing this solution, we've been able to scale jobs with more > than 2,000 subtasks, enabling scaling decisions (and metric retrieval) > every 10 seconds—an achievement that was previously hindered by timeouts > and latency. > > Questions: > Has anyone else encountered similar challenges with high-parallelism jobs > and the current metrics fetching process? If so, how have you solved it? > > Additionally, we’d love to hear your thoughts on whether this solution > could benefit the wider Flink OSS community. Would this approach be > something worth formally proposing to Flink OSS? Any guidance on how to > proceed would be greatly appreciated. > > Looking forward to hearing from you all! > > Best, > > Francesco >