Hi Gyula, Thanks for the response. I agree with your idea overall.
In general, for simple operators like filter or map, busyness should correspond with CPU utilization in a one-to-one ratio. For sinks or operators that interact with external systems, this may not hold true. Similarly, for stateful operators, state access latency can contribute to busyness without reflecting actual CPU usage, especially if state is spilled to disk through RocksDB. As a first step, we could assume a one-to-one mapping between busyness and CPU utilization, since task-level CPU utilization metrics are not readily available today. Once these metrics are exposed at the task level, we can build heuristics to determine optimal target utilization for different operator types while considering real CPU usage in addition to load. For example, in the earlier minimal case where two tasks each report 50 percent busyness but the actual CPU usage is only 1 percent for both the source and sink, we could safely allow them to run at 70 percent busyness without overusing CPU resources. With the proposed solution, the autoscaler would assign a budget of 35 percent utilization to each task. Factoring in real CPU usage, we see that 35 percent busyness only corresponds to 0.7 percent CPU utilization. To actually reach 35 percent CPU utilization, the load could be 50 times higher. However, since each subtask is single-threaded, the load should still be capped at the 70 percent busyness target. In this case, the effective utilization target would be 70 percent for each task, which is effectively the same as the current autoscaler behavior. As another example, if the two tasks again report 50 percent busyness but the actual CPU usage is 40 percent, the proposed solution would still give a 35 percent budget to each task. With real CPU usage considered, the budget could be increased by a factor of 1.25, resulting in a target utilization of 0.35 × 1.25 = 0.4375. Since this is below the 70 percent cap, we would use this number. For next steps, let me know if it makes sense to create a FLIP for these proposed changes or if a JIRA would be sufficient. I am also testing the proposed algorithm internally at my company and will put together a draft PR so the details become clearer. Cheers, Sharath On Wed, Sep 3, 2025 at 1:14 AM Gyula Fóra <[email protected]> wrote: > Hi! > Thanks, this is clearer now. > > I think getting a good solution for this can be tricky because we also do > not want to overprovision parallelism / resources unnecessarily. There are > many cases when multiple tasks that are not in the same group can all run > close to a 100 percent business when the pipeline is not CPU bound. It's a > bit hard to attribute individual tasks CPU usage contribution so probably > some heuristic approach would work best to get started with and then we can > iterate on this. > > Instead of simply aggregating the load, how about factoring in actual CPU > utilization numbers (as long as we can get this)? For example if we have > CPU at 50%, we know that we could still double the parallelism of > everything in theory (even if the individual task business already add up > to more than 100%). > > As we work toward a generally applicable solution we will also have to > consider exposing this to the users somehow so they can control how the > aggregation should happen, with some multiplier relative to the CPU > utilization or something like that. > > What do you think? > > Gyula > > On Tue, Sep 2, 2025 at 10:38 PM Sharath <[email protected]> wrote: > > > Hi Gyula, > > > > Thanks for the response, let me clarify a bit. > > > > The issue is not that Flink’s per task metrics like busyness, idleness, > or > > backpressure are incorrect. They are doing what they are supposed to do. > > The challenge comes when we try to interpret them in aggregate for > > autoscaling. > > > > Right now, the autoscaler assumes that each task has access to one full > > vCore. In practice: > > > > - > > > > With slot sharing, multiple subtasks can run on the same TaskManager, > > sharing the same vCore. Their reported busyness values do not reflect > this > > contention. > > - > > > > When cgroups are disabled, TaskManagers can burst above their > > allocation, so task level metrics look fine even though the cluster is > > oversubscribed. > > - > > > > When cgroups are enabled, the opposite happens. The autoscaler may try > > to drive tasks toward a utilization target that is not physically > > achievable because the target is set at the task level rather than the > > TaskManager level. > > > > Minimal example > > > > [image: Brainstorming and ideation.png] > > > > Suppose we have 2 TaskManagers, each with 1 slot, 1 vCore, and slot > > sharing enabled. We run a job with 4 subtasks. Flink places 2 subtasks on > > each TaskManager. For simplicity, let's assume all the subtasks have > equal > > processing capacity, throughput, and busyness. > > > > - > > > > Each task reports about 50 percent busy time. From the autoscaler’s > > perspective, this looks underutilized since 50 percent is below the 70 > > percent target. > > - > > > > At the TaskManager level, the two subtasks together are already > > saturating the single vCore, excluding any background threads. In > reality, > > there is no headroom left. > > > > In this case, the metrics are individually correct, but the autoscaler > > makes the wrong decision because it interprets them per task without any > > CPU awareness. The parallelism determined by the autoscaler would only be > > correct if each TaskManager were vertically scaled to match the number of > > tasks it is running, but such a capability does not exist. Users cannot > > reason about job efficiency in a consistent way across large and diverse > > workloads. > > > > That is why I was thinking of shifting the focus to TaskManager level > > aggregation, summing colocated tasks’ loads so we scale on something > closer > > to real CPU usage. The current autoscaling algorithm can also introduce > CPU > > skew across TaskManagers since it does not converge to a configuration > that > > uses equal parallelism for tasks sharing the same slot. The proposed > > algorithm would converge to a configuration that minimizes CPU skew. Of > > course, this would require rigorous theoretical validation, but I wanted > to > > gain some acknowledgment from the community regarding existing drawbacks. > > > > Cheers, > > Sharath > > > > On Tue, Sep 2, 2025 at 6:05 AM Gyula Fóra <[email protected]> wrote: > > > >> Hey! > >> > >> I am having a bit of trouble understanding the problem with the reported > >> metrics. I assume if something is off that means that the Flink reported > >> busyness/idleness/backpressure metrics must be off for a given job. > >> > >> Could you please give us a small/minimal example of how this would be > the > >> case? > >> Would it make sense to try to fix the problem with the Flink metrics > >> themselves instead of coming up with strategies on the autoscaler side > to > >> work around the incorrect metrics? > >> > >> I think the autoscaler algorithm is sound but it assumes fundamentally > >> correct metrics, so fixing those may be the simpler solution. > >> > >> Cheers > >> Gyula > >> > >> On Fri, Aug 22, 2025 at 1:02 AM Sharath <[email protected]> wrote: > >> > >> > Hi all, > >> > > >> > I’ve been looking into how the autoscaler behaves with jobs that have > a > >> > large number of tasks and wanted to share some thoughts to start a > >> > discussion. > >> > The problem > >> > > >> > Right now, the autoscaler implicitly assumes that each task gets a > full > >> > second of processing time. While this works in simple cases where > there > >> is > >> > only one task, it breaks down in other situations. > >> > > >> > If cgroups are not enabled on YARN, TaskManagers can use much more > than > >> > their allocated 1 vCore, leading to overallocation. When cgroups are > >> > enabled, the autoscaler may try to maintain a target utilization that > is > >> > not realistically achievable due to CPU constraints. Backlog-based > >> scaling > >> > might keep the job running without lag, but the underlying behaviour > is > >> > hard to reason about. > >> > > >> > Another challenge is that the target utilization metric does not > reflect > >> > actual CPU usage. This makes it hard for users to understand how > >> efficient > >> > their job really is or to tune the utilization threshold effectively > >> across > >> > a large number of jobs. I believe this is a fundamental limitation of > >> the > >> > original research paper > >> > <https://www.usenix.org/system/files/osdi18-kalavri.pdf> that the > Flink > >> > autoscaler is based on. In a way, the research paper implicitly > assumes > >> > that slot-sharing > >> > < > >> > > >> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/ > >> > > > >> > is disabled in Flink. > >> > Potential Solution > >> > > >> > One idea is to make the autoscaler aware of how tasks are placed and > >> shift > >> > the focus to TaskManager-level utilization instead of task-level. > >> > > >> > For example, if sub-tasks a1, b2, and c5 are colocated on the same > >> > TaskManager, their individual loads should be summed to represent the > >> total > >> > pressure on that TaskManager. The autoscaler could then scale based on > >> the > >> > average or peak pressure across all TaskManagers rather than treating > >> tasks > >> > in isolation. > >> > > >> > A possible approach would be: > >> > > >> > - > >> > > >> > Use existing metrics like busy time and backpressure to estimate > >> > per-task load. > >> > - > >> > > >> > Group colocated tasks and aggregate their loads by TaskManager. > >> > - > >> > > >> > Calculate TaskManager-level utilization and use that as the signal > >> for > >> > scaling. > >> > > >> > This approach will not capture everything, such as background threads > or > >> > garbage collection, but it should be a step closer to aligning with > real > >> > CPU usage patterns. > >> > Next steps > >> > > >> > I have not worked out all the implementation details yet, and I admit > >> this > >> > approach adds significantly more complexity than I'd like. But given > how > >> > Flink places and runs tasks with slot sharing, I think the extra > >> complexity > >> > might be necessary. > >> > > >> > I would love to get feedback from others before going further with a > >> design > >> > doc. Let me know what you think. > >> > > >> > Thanks, > >> > Sharath > >> > > >> > > >
