Hi all, I’d like to discuss an issue encountered while using Flink OSS autoscaler with jobs that have high parallelism (1k+ subtasks). I’ll share the solution we've implemented internally and would like to hear if others in the community have faced similar challenges. Additionally, I’m curious to know if our solution could be a useful contribution to Flink OSS.
The Issue: While analyzing autoscaling decisions for high-parallelism jobs, we noticed frequent connection timeouts when the autoscaler tried to fetch vertex-level metrics, even after increasing the connection timeout to one minute. Through flamegraph analysis, we identified that the root cause lies in the current metrics refresh process in the JM, which is triggered almost every time any /metrics endpoint is called. This process is synchronous, waiting for metrics from all TMs before aggregation. While this approach is simple, it's inefficient for clients like the autoscaler or the Web UI, which typically require only a small subset of metrics (fewer than 20). For larger jobs, the full refresh can involve thousands of metrics, introducing significant latency—sometimes over a minute—and leading to connection timeouts. These delays disrupt the autoscaler's ability to make timely scaling decisions. Our Solution: To address this, we added a new method to the MetricFetcher interface to allow clients to fetch only the metrics they need, instead of fetching all metrics at once: /** Fetch metrics based on the provided set of metric names. */ void update(Set<String> names); This change significantly reduces the data transferred, speeds up aggregation, and minimizes the risk of timeouts. We also kept the original update() method for backward compatibility when filtering by name isn't feasible. Additionally, we introduced the following safeguards to protect JM/TM against oom issues: 1. A new fetch won’t trigger if a full fetch is already in progress. 2. A minimum interval is enforced between fetches, preventing overloading the system. Results: Since implementing this solution, we've been able to scale jobs with more than 2,000 subtasks, enabling scaling decisions (and metric retrieval) every 10 seconds—an achievement that was previously hindered by timeouts and latency. Questions: Has anyone else encountered similar challenges with high-parallelism jobs and the current metrics fetching process? If so, how have you solved it? Additionally, we’d love to hear your thoughts on whether this solution could benefit the wider Flink OSS community. Would this approach be something worth formally proposing to Flink OSS? Any guidance on how to proceed would be greatly appreciated. Looking forward to hearing from you all! Best, Francesco