Hi!
Thanks, this is clearer now.

I think getting a good solution for this can be tricky because we also do
not want to overprovision parallelism / resources unnecessarily. There are
many cases when multiple tasks that are not in the same group can all run
close to a 100 percent business when the pipeline is not CPU bound. It's a
bit hard to attribute individual tasks CPU usage contribution so probably
some heuristic approach would work best to get started with and then we can
iterate on this.

Instead of simply aggregating the load, how about factoring in actual CPU
utilization numbers (as long as we can get this)? For example if we have
CPU at 50%, we know that we could still double the parallelism of
everything in theory (even if the individual task business already add up
to more than 100%).

As we work toward a generally applicable solution we will also have to
consider exposing this to the users somehow so they can control how the
aggregation should happen, with some multiplier relative to the CPU
utilization or something like that.

What do you think?

Gyula

On Tue, Sep 2, 2025 at 10:38 PM Sharath <[email protected]> wrote:

> Hi Gyula,
>
> Thanks for the response, let me clarify a bit.
>
> The issue is not that Flink’s per task metrics like busyness, idleness, or
> backpressure are incorrect. They are doing what they are supposed to do.
> The challenge comes when we try to interpret them in aggregate for
> autoscaling.
>
> Right now, the autoscaler assumes that each task has access to one full
> vCore. In practice:
>
>    -
>
>    With slot sharing, multiple subtasks can run on the same TaskManager,
>    sharing the same vCore. Their reported busyness values do not reflect this
>    contention.
>    -
>
>    When cgroups are disabled, TaskManagers can burst above their
>    allocation, so task level metrics look fine even though the cluster is
>    oversubscribed.
>    -
>
>    When cgroups are enabled, the opposite happens. The autoscaler may try
>    to drive tasks toward a utilization target that is not physically
>    achievable because the target is set at the task level rather than the
>    TaskManager level.
>
> Minimal example
>
> [image: Brainstorming and ideation.png]
>
> Suppose we have 2 TaskManagers, each with 1 slot, 1 vCore, and slot
> sharing enabled. We run a job with 4 subtasks. Flink places 2 subtasks on
> each TaskManager. For simplicity, let's assume all the subtasks have equal
> processing capacity, throughput, and busyness.
>
>    -
>
>    Each task reports about 50 percent busy time. From the autoscaler’s
>    perspective, this looks underutilized since 50 percent is below the 70
>    percent target.
>    -
>
>    At the TaskManager level, the two subtasks together are already
>    saturating the single vCore, excluding any background threads. In reality,
>    there is no headroom left.
>
> In this case, the metrics are individually correct, but the autoscaler
> makes the wrong decision because it interprets them per task without any
> CPU awareness. The parallelism determined by the autoscaler would only be
> correct if each TaskManager were vertically scaled to match the number of
> tasks it is running, but such a capability does not exist. Users cannot
> reason about job efficiency in a consistent way across large and diverse
> workloads.
>
> That is why I was thinking of shifting the focus to TaskManager level
> aggregation, summing colocated tasks’ loads so we scale on something closer
> to real CPU usage. The current autoscaling algorithm can also introduce CPU
> skew across TaskManagers since it does not converge to a configuration that
> uses equal parallelism for tasks sharing the same slot. The proposed
> algorithm would converge to a configuration that minimizes CPU skew. Of
> course, this would require rigorous theoretical validation, but I wanted to
> gain some acknowledgment from the community regarding existing drawbacks.
>
> Cheers,
> Sharath
>
> On Tue, Sep 2, 2025 at 6:05 AM Gyula Fóra <[email protected]> wrote:
>
>> Hey!
>>
>> I am having a bit of trouble understanding the problem with the reported
>> metrics. I assume if something is off that means that the Flink reported
>> busyness/idleness/backpressure metrics must be off for a given job.
>>
>> Could you please give us a small/minimal example of how this would be the
>> case?
>> Would it make sense to try to fix the problem with the Flink metrics
>> themselves instead of coming up with strategies on the autoscaler side to
>> work around the incorrect metrics?
>>
>> I think the autoscaler algorithm is sound but it assumes fundamentally
>> correct metrics, so fixing those may be the simpler solution.
>>
>> Cheers
>> Gyula
>>
>> On Fri, Aug 22, 2025 at 1:02 AM Sharath <[email protected]> wrote:
>>
>> > Hi all,
>> >
>> > I’ve been looking into how the autoscaler behaves with jobs that have a
>> > large number of tasks and wanted to share some thoughts to start a
>> > discussion.
>> > The problem
>> >
>> > Right now, the autoscaler implicitly assumes that each task gets a full
>> > second of processing time. While this works in simple cases where there
>> is
>> > only one task, it breaks down in other situations.
>> >
>> > If cgroups are not enabled on YARN, TaskManagers can use much more than
>> > their allocated 1 vCore, leading to overallocation. When cgroups are
>> > enabled, the autoscaler may try to maintain a target utilization that is
>> > not realistically achievable due to CPU constraints. Backlog-based
>> scaling
>> > might keep the job running without lag, but the underlying behaviour is
>> > hard to reason about.
>> >
>> > Another challenge is that the target utilization metric does not reflect
>> > actual CPU usage. This makes it hard for users to understand how
>> efficient
>> > their job really is or to tune the utilization threshold effectively
>> across
>> > a large number of jobs. I believe this is a fundamental limitation of
>> the
>> > original research paper
>> > <https://www.usenix.org/system/files/osdi18-kalavri.pdf> that the Flink
>> > autoscaler is based on. In a way, the research paper implicitly assumes
>> > that slot-sharing
>> > <
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/
>> > >
>> > is disabled in Flink.
>> > Potential Solution
>> >
>> > One idea is to make the autoscaler aware of how tasks are placed and
>> shift
>> > the focus to TaskManager-level utilization instead of task-level.
>> >
>> > For example, if sub-tasks a1, b2, and c5 are colocated on the same
>> > TaskManager, their individual loads should be summed to represent the
>> total
>> > pressure on that TaskManager. The autoscaler could then scale based on
>> the
>> > average or peak pressure across all TaskManagers rather than treating
>> tasks
>> > in isolation.
>> >
>> > A possible approach would be:
>> >
>> >    -
>> >
>> >    Use existing metrics like busy time and backpressure to estimate
>> >    per-task load.
>> >    -
>> >
>> >    Group colocated tasks and aggregate their loads by TaskManager.
>> >    -
>> >
>> >    Calculate TaskManager-level utilization and use that as the signal
>> for
>> >    scaling.
>> >
>> > This approach will not capture everything, such as background threads or
>> > garbage collection, but it should be a step closer to aligning with real
>> > CPU usage patterns.
>> > Next steps
>> >
>> > I have not worked out all the implementation details yet, and I admit
>> this
>> > approach adds significantly more complexity than I'd like. But given how
>> > Flink places and runs tasks with slot sharing, I think the extra
>> complexity
>> > might be necessary.
>> >
>> > I would love to get feedback from others before going further with a
>> design
>> > doc. Let me know what you think.
>> >
>> > Thanks,
>> > Sharath
>> >
>>
>

Reply via email to