Hi Yang,

I thought we could enable Adaptive Scheduler, so adding or removing a task
manager is the same as restarting a job when we use an adaptive scheduler.
Do I miss anything ?

Thanks

On Tue, May 24, 2022 at 8:16 PM Yang Wang <danrtsey...@gmail.com> wrote:

> Thanks for the interesting discussion.
>
> Compared with reactive mode, leveraging the flink-kubernetes-operator to
> do the job restarting/upgrading is another solution for auto-scaling.
> Given that fully restarting a Flink application on K8s is not too slow,
> this is a reasonable way.
> Really hope we could get some progress in such area.
>
> Best,
> Yang
>
> Gyula Fóra <gyula.f...@gmail.com> 于2022年5月25日周三 09:04写道:
>
>> Hi Talat!
>>
>> It would be great to have a HPA that works based on some flink
>> throughput/backlog metrics. I wonder how you are going to access the Flink
>> metrics in the HPA, we might need some integration with the k8s metrics
>> system.
>> In any case whether we need a FLIP or not depends on the complexity, if
>> it's simple then we can go without a FLIP.
>>
>> Cheers,
>> Gyula
>>
>> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com>
>> wrote:
>>
>> > Hi Gyula,
>> >
>> > This seems very promising for initial scaling. We are using Flink
>> > Kubernetes Operators. Most probably we are very early adapters for it :)
>> > Let me try it. Get back to you soon.
>> >
>> > My plan is building a general purpose CPU and backlog/throughput base
>> > autoscaling for Flink. I can create a Custom Open Source HPA on top of
>> your
>> > changes. Do I need to create a FLIP for it ?
>> >
>> > Just general information about us Today we use another execution env.
>> if
>> > the Job scheduler does not support autoscaling. Having a HPA works if
>> your
>> > sources are well balanced. If there is uneven distribution on sources,
>> > Having auto scaling feature on scheduler can help better utilization.
>> But
>> > this is not urgent. We can start using your PR at least for a while.
>> >
>> > Thanks
>> >
>> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gyula.f...@gmail.com>
>> wrote:
>> >
>> >> Hi Talat!
>> >>
>> >> One other approach that we are investigating currently is combining
>> the Flink
>> >> Kubernetes Operator
>> >> <
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$>
>> with
>> >> the K8S scaling capabilities (Horizontal Pod autoscaler)
>> >>
>> >> In this approach the HPA monitors the Taskmanager pods directly and can
>> >> modify the FlinkDeployment resource replica number to trigger a
>> stateful
>> >> job scale-up/down through the operator.
>> >> Obviously not as nice as the reactive mode but it works with the
>> current
>> >> Kubernetes Native implementation easily. It is also theoretically
>> possible
>> >> to integrate this with other custom Flink metrics but we haven't
>> tested yet.
>> >>
>> >> I have a created a POC pull request that showcases these capabilities:
>> >> https://github.com/apache/flink-kubernetes-operator/pull/227
>> <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6b-_3CvRI$>
>> >> <
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$
>> >
>> >>
>> >> If you are interested it would be nice if you could check it out and
>> >> provide feedback, we will get back to refining this after our current
>> >> ongoing release.
>> >>
>> >> Cheers,
>> >> Gyula
>> >>
>> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <d...@apache.org>
>> wrote:
>> >>
>> >>> Hi Talat,
>> >>>
>> >>> This is definitely an interesting and rather complex topic.
>> >>>
>> >>> Few unstructured thoughts / notes / questions:
>> >>>
>> >>> - The main struggle has always been that it's hard to come up with a
>> >>> generic one-size-fits-it-all metrics for autoscaling.
>> >>>   - Flink doesn't have knowledge of the external environment (eg.
>> >>> capacity
>> >>> planning on the cluster, no notion of pre-emption), so it can not
>> really
>> >>> make a qualified decision in some cases.
>> >>>   - ^ the above goes along the same reasoning as why we don't support
>> >>> reactive mode with the session cluster (multi-job scheduling)
>> >>> - The re-scaling decision logic most likely needs to be pluggable from
>> >>> the
>> >>> above reasons
>> >>>   - We're in general fairly concerned about running any user code in
>> JM
>> >>> for
>> >>> stability reasons.
>> >>>   - The most flexible option would be allowing to set the desired
>> >>> parallelism via rest api and leave the scaling decision to an external
>> >>> process, which could be reused for both standalone and "active"
>> >>> deployment
>> >>> modes (there is actually a prototype by Till, that allows this [1])
>> >>>
>> >>> How do you intend to make an autoscaling decision? Also note that the
>> >>> re-scaling is still a fairly expensive operation (especially with
>> large
>> >>> state), so you need to make sure autoscaler doesn't oscillate and
>> doesn't
>> >>> re-scale too often (this is also something that could vary from
>> workload
>> >>> to
>> >>> workload).
>> >>>
>> >>> Note on the metrics question with an auto-scaler living in the JM:
>> >>> - We shouldn't really collect the metrics into the JM, but instead JM
>> can
>> >>> pull then from TMs directly on-demand (basically the same thing and
>> >>> external auto-scaler would do).
>> >>>
>> >>> Looking forward to your thoughts
>> >>>
>> >>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling
>> <https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6bc0Id0Zg$>
>> >>> <
>> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$
>> >
>> >>>
>> >>> Best,
>> >>> D.
>> >>>
>> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
>> >>> tuya...@paloaltonetworks.com>
>> >>> wrote:
>> >>>
>> >>> > Hi,
>> >>> > I am working on auto scaling support for native deployments. Today
>> >>> Flink
>> >>> > provides Reactive mode however it only runs on standalone
>> deployments.
>> >>> We
>> >>> > use Kubernetes native deployment. So I want to increase or decrease
>> job
>> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are
>> very
>> >>> > useful to achieve this goal. I started reading code of Flink
>> >>> JobManager,
>> >>> > AdaptiveScheduler and DeclarativeSlotPool etc.
>> >>> >
>> >>> > My assumption is Required Resources will be calculated on
>> >>> AdaptiveScheduler
>> >>> > whenever the scheduler receives a heartbeat from a task manager by
>> >>> calling
>> >>> > public void updateAccumulators(AccumulatorSnapshot
>> accumulatorSnapshot)
>> >>> > method.
>> >>> >
>> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I
>> >>> only see
>> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you have
>> any
>> >>> > suggestions to collect metrics from TaskManagers ? Should I add
>> >>> metrics on
>> >>> > TaskExecutorToJobManagerHeartbeatPayload ?
>> >>> >
>> >>> > I am open to another suggestion for this. Whenever I finalize my
>> >>> > investigation. I will create a FLIP for more detailed
>> implementation.
>> >>> >
>> >>> > Thanks for your help in advance.
>> >>> > Talat
>> >>> >
>> >>>
>> >>
>>
>

Reply via email to