>
> I thought we could enable Adaptive Scheduler, so adding or removing a task
> manager is the same as restarting a job when we use an adaptive scheduler.
> Do I miss anything ?


It is true for standalone mode since adding/removing a TaskManager pod is
fully controlled by users(or external tools).
But it is not valid for native K8s integration[1]. Currently, we could not
dynamically change the TaskManager pods once the job is running.

I really hope the HPA could work for both standalone and native mode.


[1]. https://flink.apache.org/2021/02/10/native-k8s-with-ha.html

Best,
Yang

Gyula Fóra <gyula.f...@gmail.com> 于2022年5月30日周一 12:23写道:

> Hi Talat!
>
> Sorry for the late reply, I have been busy with some fixes for the release
> and travelling.
>
> I think the prometheus metrics integration sounds like a great idea that
> would cover the needs of most users.
> This way users can also integrate easily with the custom Flink metrics too.
>
> maxReplicas: We could add this easily to the taskManager resource specs
>
> Nice workflow picture, I would love to include this in the docs later. One
> minor comment, should the HPA be outside of the FlinkDeployment box?
>
> Cheers,
> Gyula
>
> On Wed, May 25, 2022 at 7:50 PM Talat Uyarer <tuya...@paloaltonetworks.com>
> wrote:
>
>> Hi Yang,
>>
>> I thought we could enable Adaptive Scheduler, so adding or removing a task
>> manager is the same as restarting a job when we use an adaptive scheduler.
>> Do I miss anything ?
>>
>> Thanks
>>
>> On Tue, May 24, 2022 at 8:16 PM Yang Wang <danrtsey...@gmail.com> wrote:
>>
>> > Thanks for the interesting discussion.
>> >
>> > Compared with reactive mode, leveraging the flink-kubernetes-operator to
>> > do the job restarting/upgrading is another solution for auto-scaling.
>> > Given that fully restarting a Flink application on K8s is not too slow,
>> > this is a reasonable way.
>> > Really hope we could get some progress in such area.
>> >
>> > Best,
>> > Yang
>> >
>> > Gyula Fóra <gyula.f...@gmail.com> 于2022年5月25日周三 09:04写道:
>> >
>> >> Hi Talat!
>> >>
>> >> It would be great to have a HPA that works based on some flink
>> >> throughput/backlog metrics. I wonder how you are going to access the
>> Flink
>> >> metrics in the HPA, we might need some integration with the k8s metrics
>> >> system.
>> >> In any case whether we need a FLIP or not depends on the complexity, if
>> >> it's simple then we can go without a FLIP.
>> >>
>> >> Cheers,
>> >> Gyula
>> >>
>> >> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer <
>> >> tuya...@paloaltonetworks.com>
>> >> wrote:
>> >>
>> >> > Hi Gyula,
>> >> >
>> >> > This seems very promising for initial scaling. We are using Flink
>> >> > Kubernetes Operators. Most probably we are very early adapters for
>> it :)
>> >> > Let me try it. Get back to you soon.
>> >> >
>> >> > My plan is building a general purpose CPU and backlog/throughput base
>> >> > autoscaling for Flink. I can create a Custom Open Source HPA on top
>> of
>> >> your
>> >> > changes. Do I need to create a FLIP for it ?
>> >> >
>> >> > Just general information about us Today we use another execution env.
>> >> if
>> >> > the Job scheduler does not support autoscaling. Having a HPA works if
>> >> your
>> >> > sources are well balanced. If there is uneven distribution on
>> sources,
>> >> > Having auto scaling feature on scheduler can help better utilization.
>> >> But
>> >> > this is not urgent. We can start using your PR at least for a while.
>> >> >
>> >> > Thanks
>> >> >
>> >> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gyula.f...@gmail.com>
>> >> wrote:
>> >> >
>> >> >> Hi Talat!
>> >> >>
>> >> >> One other approach that we are investigating currently is combining
>> >> the Flink
>> >> >> Kubernetes Operator
>> >> >> <
>> >>
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$
>> >
>> >> with
>> >> >> the K8S scaling capabilities (Horizontal Pod autoscaler)
>> >> >>
>> >> >> In this approach the HPA monitors the Taskmanager pods directly and
>> can
>> >> >> modify the FlinkDeployment resource replica number to trigger a
>> >> stateful
>> >> >> job scale-up/down through the operator.
>> >> >> Obviously not as nice as the reactive mode but it works with the
>> >> current
>> >> >> Kubernetes Native implementation easily. It is also theoretically
>> >> possible
>> >> >> to integrate this with other custom Flink metrics but we haven't
>> >> tested yet.
>> >> >>
>> >> >> I have a created a POC pull request that showcases these
>> capabilities:
>> >> >> https://github.com/apache/flink-kubernetes-operator/pull/227
>> >> <
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6b-_3CvRI$
>> >
>> >> >> <
>> >>
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$
>> >> >
>> >> >>
>> >> >> If you are interested it would be nice if you could check it out and
>> >> >> provide feedback, we will get back to refining this after our
>> current
>> >> >> ongoing release.
>> >> >>
>> >> >> Cheers,
>> >> >> Gyula
>> >> >>
>> >> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <d...@apache.org>
>> >> wrote:
>> >> >>
>> >> >>> Hi Talat,
>> >> >>>
>> >> >>> This is definitely an interesting and rather complex topic.
>> >> >>>
>> >> >>> Few unstructured thoughts / notes / questions:
>> >> >>>
>> >> >>> - The main struggle has always been that it's hard to come up with
>> a
>> >> >>> generic one-size-fits-it-all metrics for autoscaling.
>> >> >>>   - Flink doesn't have knowledge of the external environment (eg.
>> >> >>> capacity
>> >> >>> planning on the cluster, no notion of pre-emption), so it can not
>> >> really
>> >> >>> make a qualified decision in some cases.
>> >> >>>   - ^ the above goes along the same reasoning as why we don't
>> support
>> >> >>> reactive mode with the session cluster (multi-job scheduling)
>> >> >>> - The re-scaling decision logic most likely needs to be pluggable
>> from
>> >> >>> the
>> >> >>> above reasons
>> >> >>>   - We're in general fairly concerned about running any user code
>> in
>> >> JM
>> >> >>> for
>> >> >>> stability reasons.
>> >> >>>   - The most flexible option would be allowing to set the desired
>> >> >>> parallelism via rest api and leave the scaling decision to an
>> external
>> >> >>> process, which could be reused for both standalone and "active"
>> >> >>> deployment
>> >> >>> modes (there is actually a prototype by Till, that allows this [1])
>> >> >>>
>> >> >>> How do you intend to make an autoscaling decision? Also note that
>> the
>> >> >>> re-scaling is still a fairly expensive operation (especially with
>> >> large
>> >> >>> state), so you need to make sure autoscaler doesn't oscillate and
>> >> doesn't
>> >> >>> re-scale too often (this is also something that could vary from
>> >> workload
>> >> >>> to
>> >> >>> workload).
>> >> >>>
>> >> >>> Note on the metrics question with an auto-scaler living in the JM:
>> >> >>> - We shouldn't really collect the metrics into the JM, but instead
>> JM
>> >> can
>> >> >>> pull then from TMs directly on-demand (basically the same thing and
>> >> >>> external auto-scaler would do).
>> >> >>>
>> >> >>> Looking forward to your thoughts
>> >> >>>
>> >> >>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling
>> >> <
>> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6bc0Id0Zg$
>> >
>> >> >>> <
>> >>
>> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$
>> >> >
>> >> >>>
>> >> >>> Best,
>> >> >>> D.
>> >> >>>
>> >> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
>> >> >>> tuya...@paloaltonetworks.com>
>> >> >>> wrote:
>> >> >>>
>> >> >>> > Hi,
>> >> >>> > I am working on auto scaling support for native deployments.
>> Today
>> >> >>> Flink
>> >> >>> > provides Reactive mode however it only runs on standalone
>> >> deployments.
>> >> >>> We
>> >> >>> > use Kubernetes native deployment. So I want to increase or
>> decrease
>> >> job
>> >> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are
>> >> very
>> >> >>> > useful to achieve this goal. I started reading code of Flink
>> >> >>> JobManager,
>> >> >>> > AdaptiveScheduler and DeclarativeSlotPool etc.
>> >> >>> >
>> >> >>> > My assumption is Required Resources will be calculated on
>> >> >>> AdaptiveScheduler
>> >> >>> > whenever the scheduler receives a heartbeat from a task manager
>> by
>> >> >>> calling
>> >> >>> > public void updateAccumulators(AccumulatorSnapshot
>> >> accumulatorSnapshot)
>> >> >>> > method.
>> >> >>> >
>> >> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however
>> I
>> >> >>> only see
>> >> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you have
>> >> any
>> >> >>> > suggestions to collect metrics from TaskManagers ? Should I add
>> >> >>> metrics on
>> >> >>> > TaskExecutorToJobManagerHeartbeatPayload ?
>> >> >>> >
>> >> >>> > I am open to another suggestion for this. Whenever I finalize my
>> >> >>> > investigation. I will create a FLIP for more detailed
>> >> implementation.
>> >> >>> >
>> >> >>> > Thanks for your help in advance.
>> >> >>> > Talat
>> >> >>> >
>> >> >>>
>> >> >>
>> >>
>> >
>>
>

Reply via email to