Hi Talat,

Using sub resources for the auto scaling makes a lot of sense to me.

Could you be more specific why you think changing task manager count will
> not work for native deployment ?


The native K8s integration is using active resourcemanager. It means that
the TaskManager count will be calculated by *parallelism / numTaskSlot*. If
we want to add more TaskManager pods, then we need to increase the
parallelism.
Not like the standalone deployment, there's no way to directly configure
the TaskManager count.

Given that we could not set the replicas of TaskManager pods when using
native K8s integration, we need to calculate and configure the parallelism
via [min/max]*replicas * numTaskSlots*. See the prototype in Gyula's PR[1].
So the problem is how could we change the parallelism without creating the
Flink application again. We do not have a restAPI for this.

[1]. https://github.com/apache/flink-kubernetes-operator/pull/227

Best,
Yang

Talat Uyarer <tuya...@paloaltonetworks.com> 于2022年6月1日周三 08:34写道:

> Hi Yang and Gyula,
>
> Yang, Could you give a little bit more information ?  What prevents us
> from changing task managers' count ? I am aware of ActiveResourceManager of
> Flink. But Flink only calls resources when it initializes a cluster.
> If we set
>
>    - jobmanager.scheduler: adaptive
>    - cluster.declarative-resource-management.enabled: true
>
> While deploying a Flink Native cluster. Even though it is native
> deployment. Flink will be able to add task manager add/remove behavior.
> Because basically adding/removing a task manager is similar to recovering a
> failed task manager.
>
> Could you be more specific why you think changing task manager count will
> not work for native deployment ? I will not use reactive-mode. Scaling up
> or down will be handled by HPA. We will define sub sources.[1]
> Users will give us starting points such as replicaCount and max count such
> as maxRecplicaCount. Flink clusters will be initialized by replicaCount for
> TaskManager.
>
> Gyula, I want to make HPA part of FlinkDeployment. And introduce auto
> scaling settings such as metric service endpoints and some other default
> settings such as threshold etc. to reduce complexity. Let me start
> implementing something after Yang's answer. When users enable autoscaling
> we need to also set scheduler and declarative resource management settings
> behind the scenes.
>
> Thanks
>
> [1]
> https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#scale-subresource
>
> On Mon, May 30, 2022 at 2:25 AM Yang Wang <danrtsey...@gmail.com> wrote:
>
>> >
>> > I thought we could enable Adaptive Scheduler, so adding or removing a
>> task
>> > manager is the same as restarting a job when we use an adaptive
>> scheduler.
>> > Do I miss anything ?
>>
>>
>> It is true for standalone mode since adding/removing a TaskManager pod is
>> fully controlled by users(or external tools).
>> But it is not valid for native K8s integration[1]. Currently, we could not
>> dynamically change the TaskManager pods once the job is running.
>>
>> I really hope the HPA could work for both standalone and native mode.
>>
>>
>> [1].
>> https://urldefense.com/v3/__https://flink.apache.org/2021/02/10/native-k8s-with-ha.html__;!!Mt_FR42WkD9csi9Y!fSD-IVwNQGjF2jX2ExbTekF48yBxK1zCCVa3T-thC0h0G6Q_X5CWKiOnr7yYf8pgzdOCFer91CODppTow0VgfaqMAuI$
>>
>> Best,
>> Yang
>>
>> Gyula Fóra <gyula.f...@gmail.com> 于2022年5月30日周一 12:23写道:
>>
>> > Hi Talat!
>> >
>> > Sorry for the late reply, I have been busy with some fixes for the
>> release
>> > and travelling.
>> >
>> > I think the prometheus metrics integration sounds like a great idea that
>> > would cover the needs of most users.
>> > This way users can also integrate easily with the custom Flink metrics
>> too.
>> >
>> > maxReplicas: We could add this easily to the taskManager resource specs
>> >
>> > Nice workflow picture, I would love to include this in the docs later.
>> One
>> > minor comment, should the HPA be outside of the FlinkDeployment box?
>> >
>> > Cheers,
>> > Gyula
>> >
>> > On Wed, May 25, 2022 at 7:50 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com>
>> > wrote:
>> >
>> >> Hi Yang,
>> >>
>> >> I thought we could enable Adaptive Scheduler, so adding or removing a
>> task
>> >> manager is the same as restarting a job when we use an adaptive
>> scheduler.
>> >> Do I miss anything ?
>> >>
>> >> Thanks
>> >>
>> >> On Tue, May 24, 2022 at 8:16 PM Yang Wang <danrtsey...@gmail.com>
>> wrote:
>> >>
>> >> > Thanks for the interesting discussion.
>> >> >
>> >> > Compared with reactive mode, leveraging the
>> flink-kubernetes-operator to
>> >> > do the job restarting/upgrading is another solution for auto-scaling.
>> >> > Given that fully restarting a Flink application on K8s is not too
>> slow,
>> >> > this is a reasonable way.
>> >> > Really hope we could get some progress in such area.
>> >> >
>> >> > Best,
>> >> > Yang
>> >> >
>> >> > Gyula Fóra <gyula.f...@gmail.com> 于2022年5月25日周三 09:04写道:
>> >> >
>> >> >> Hi Talat!
>> >> >>
>> >> >> It would be great to have a HPA that works based on some flink
>> >> >> throughput/backlog metrics. I wonder how you are going to access the
>> >> Flink
>> >> >> metrics in the HPA, we might need some integration with the k8s
>> metrics
>> >> >> system.
>> >> >> In any case whether we need a FLIP or not depends on the
>> complexity, if
>> >> >> it's simple then we can go without a FLIP.
>> >> >>
>> >> >> Cheers,
>> >> >> Gyula
>> >> >>
>> >> >> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer <
>> >> >> tuya...@paloaltonetworks.com>
>> >> >> wrote:
>> >> >>
>> >> >> > Hi Gyula,
>> >> >> >
>> >> >> > This seems very promising for initial scaling. We are using Flink
>> >> >> > Kubernetes Operators. Most probably we are very early adapters for
>> >> it :)
>> >> >> > Let me try it. Get back to you soon.
>> >> >> >
>> >> >> > My plan is building a general purpose CPU and backlog/throughput
>> base
>> >> >> > autoscaling for Flink. I can create a Custom Open Source HPA on
>> top
>> >> of
>> >> >> your
>> >> >> > changes. Do I need to create a FLIP for it ?
>> >> >> >
>> >> >> > Just general information about us Today we use another execution
>> env.
>> >> >> if
>> >> >> > the Job scheduler does not support autoscaling. Having a HPA
>> works if
>> >> >> your
>> >> >> > sources are well balanced. If there is uneven distribution on
>> >> sources,
>> >> >> > Having auto scaling feature on scheduler can help better
>> utilization.
>> >> >> But
>> >> >> > this is not urgent. We can start using your PR at least for a
>> while.
>> >> >> >
>> >> >> > Thanks
>> >> >> >
>> >> >> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gyula.f...@gmail.com>
>> >> >> wrote:
>> >> >> >
>> >> >> >> Hi Talat!
>> >> >> >>
>> >> >> >> One other approach that we are investigating currently is
>> combining
>> >> >> the Flink
>> >> >> >> Kubernetes Operator
>> >> >> >> <
>> >> >>
>> >>
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$
>> >> >
>> >> >> with
>> >> >> >> the K8S scaling capabilities (Horizontal Pod autoscaler)
>> >> >> >>
>> >> >> >> In this approach the HPA monitors the Taskmanager pods directly
>> and
>> >> can
>> >> >> >> modify the FlinkDeployment resource replica number to trigger a
>> >> >> stateful
>> >> >> >> job scale-up/down through the operator.
>> >> >> >> Obviously not as nice as the reactive mode but it works with the
>> >> >> current
>> >> >> >> Kubernetes Native implementation easily. It is also theoretically
>> >> >> possible
>> >> >> >> to integrate this with other custom Flink metrics but we haven't
>> >> >> tested yet.
>> >> >> >>
>> >> >> >> I have a created a POC pull request that showcases these
>> >> capabilities:
>> >> >> >>
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fSD-IVwNQGjF2jX2ExbTekF48yBxK1zCCVa3T-thC0h0G6Q_X5CWKiOnr7yYf8pgzdOCFer91CODppTow0VgG7Oirow$
>> >> >> <
>> >>
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6b-_3CvRI$
>> >> >
>> >> >> >> <
>> >> >>
>> >>
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$
>> >> >> >
>> >> >> >>
>> >> >> >> If you are interested it would be nice if you could check it out
>> and
>> >> >> >> provide feedback, we will get back to refining this after our
>> >> current
>> >> >> >> ongoing release.
>> >> >> >>
>> >> >> >> Cheers,
>> >> >> >> Gyula
>> >> >> >>
>> >> >> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <d...@apache.org>
>> >> >> wrote:
>> >> >> >>
>> >> >> >>> Hi Talat,
>> >> >> >>>
>> >> >> >>> This is definitely an interesting and rather complex topic.
>> >> >> >>>
>> >> >> >>> Few unstructured thoughts / notes / questions:
>> >> >> >>>
>> >> >> >>> - The main struggle has always been that it's hard to come up
>> with
>> >> a
>> >> >> >>> generic one-size-fits-it-all metrics for autoscaling.
>> >> >> >>>   - Flink doesn't have knowledge of the external environment
>> (eg.
>> >> >> >>> capacity
>> >> >> >>> planning on the cluster, no notion of pre-emption), so it can
>> not
>> >> >> really
>> >> >> >>> make a qualified decision in some cases.
>> >> >> >>>   - ^ the above goes along the same reasoning as why we don't
>> >> support
>> >> >> >>> reactive mode with the session cluster (multi-job scheduling)
>> >> >> >>> - The re-scaling decision logic most likely needs to be
>> pluggable
>> >> from
>> >> >> >>> the
>> >> >> >>> above reasons
>> >> >> >>>   - We're in general fairly concerned about running any user
>> code
>> >> in
>> >> >> JM
>> >> >> >>> for
>> >> >> >>> stability reasons.
>> >> >> >>>   - The most flexible option would be allowing to set the
>> desired
>> >> >> >>> parallelism via rest api and leave the scaling decision to an
>> >> external
>> >> >> >>> process, which could be reused for both standalone and "active"
>> >> >> >>> deployment
>> >> >> >>> modes (there is actually a prototype by Till, that allows this
>> [1])
>> >> >> >>>
>> >> >> >>> How do you intend to make an autoscaling decision? Also note
>> that
>> >> the
>> >> >> >>> re-scaling is still a fairly expensive operation (especially
>> with
>> >> >> large
>> >> >> >>> state), so you need to make sure autoscaler doesn't oscillate
>> and
>> >> >> doesn't
>> >> >> >>> re-scale too often (this is also something that could vary from
>> >> >> workload
>> >> >> >>> to
>> >> >> >>> workload).
>> >> >> >>>
>> >> >> >>> Note on the metrics question with an auto-scaler living in the
>> JM:
>> >> >> >>> - We shouldn't really collect the metrics into the JM, but
>> instead
>> >> JM
>> >> >> can
>> >> >> >>> pull then from TMs directly on-demand (basically the same thing
>> and
>> >> >> >>> external auto-scaler would do).
>> >> >> >>>
>> >> >> >>> Looking forward to your thoughts
>> >> >> >>>
>> >> >> >>> [1]
>> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fSD-IVwNQGjF2jX2ExbTekF48yBxK1zCCVa3T-thC0h0G6Q_X5CWKiOnr7yYf8pgzdOCFer91CODppTow0Vg382rLOI$
>> >> >> <
>> >>
>> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6bc0Id0Zg$
>> >> >
>> >> >> >>> <
>> >> >>
>> >>
>> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$
>> >> >> >
>> >> >> >>>
>> >> >> >>> Best,
>> >> >> >>> D.
>> >> >> >>>
>> >> >> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
>> >> >> >>> tuya...@paloaltonetworks.com>
>> >> >> >>> wrote:
>> >> >> >>>
>> >> >> >>> > Hi,
>> >> >> >>> > I am working on auto scaling support for native deployments.
>> >> Today
>> >> >> >>> Flink
>> >> >> >>> > provides Reactive mode however it only runs on standalone
>> >> >> deployments.
>> >> >> >>> We
>> >> >> >>> > use Kubernetes native deployment. So I want to increase or
>> >> decrease
>> >> >> job
>> >> >> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160
>> are
>> >> >> very
>> >> >> >>> > useful to achieve this goal. I started reading code of Flink
>> >> >> >>> JobManager,
>> >> >> >>> > AdaptiveScheduler and DeclarativeSlotPool etc.
>> >> >> >>> >
>> >> >> >>> > My assumption is Required Resources will be calculated on
>> >> >> >>> AdaptiveScheduler
>> >> >> >>> > whenever the scheduler receives a heartbeat from a task
>> manager
>> >> by
>> >> >> >>> calling
>> >> >> >>> > public void updateAccumulators(AccumulatorSnapshot
>> >> >> accumulatorSnapshot)
>> >> >> >>> > method.
>> >> >> >>> >
>> >> >> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class
>> however
>> >> I
>> >> >> >>> only see
>> >> >> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you
>> have
>> >> >> any
>> >> >> >>> > suggestions to collect metrics from TaskManagers ? Should I
>> add
>> >> >> >>> metrics on
>> >> >> >>> > TaskExecutorToJobManagerHeartbeatPayload ?
>> >> >> >>> >
>> >> >> >>> > I am open to another suggestion for this. Whenever I finalize
>> my
>> >> >> >>> > investigation. I will create a FLIP for more detailed
>> >> >> implementation.
>> >> >> >>> >
>> >> >> >>> > Thanks for your help in advance.
>> >> >> >>> > Talat
>> >> >> >>> >
>> >> >> >>>
>> >> >> >>
>> >> >>
>> >> >
>> >>
>> >
>>
>

Reply via email to