Thanks for the interesting discussion.

Compared with reactive mode, leveraging the flink-kubernetes-operator to do
the job restarting/upgrading is another solution for auto-scaling.
Given that fully restarting a Flink application on K8s is not too slow,
this is a reasonable way.
Really hope we could get some progress in such area.

Best,
Yang

Gyula Fóra <gyula.f...@gmail.com> 于2022年5月25日周三 09:04写道:

> Hi Talat!
>
> It would be great to have a HPA that works based on some flink
> throughput/backlog metrics. I wonder how you are going to access the Flink
> metrics in the HPA, we might need some integration with the k8s metrics
> system.
> In any case whether we need a FLIP or not depends on the complexity, if
> it's simple then we can go without a FLIP.
>
> Cheers,
> Gyula
>
> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer <
> tuya...@paloaltonetworks.com>
> wrote:
>
> > Hi Gyula,
> >
> > This seems very promising for initial scaling. We are using Flink
> > Kubernetes Operators. Most probably we are very early adapters for it :)
> > Let me try it. Get back to you soon.
> >
> > My plan is building a general purpose CPU and backlog/throughput base
> > autoscaling for Flink. I can create a Custom Open Source HPA on top of
> your
> > changes. Do I need to create a FLIP for it ?
> >
> > Just general information about us Today we use another execution env.  if
> > the Job scheduler does not support autoscaling. Having a HPA works if
> your
> > sources are well balanced. If there is uneven distribution on sources,
> > Having auto scaling feature on scheduler can help better utilization. But
> > this is not urgent. We can start using your PR at least for a while.
> >
> > Thanks
> >
> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
> >
> >> Hi Talat!
> >>
> >> One other approach that we are investigating currently is combining the
> Flink
> >> Kubernetes Operator
> >> <
> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$>
> with
> >> the K8S scaling capabilities (Horizontal Pod autoscaler)
> >>
> >> In this approach the HPA monitors the Taskmanager pods directly and can
> >> modify the FlinkDeployment resource replica number to trigger a stateful
> >> job scale-up/down through the operator.
> >> Obviously not as nice as the reactive mode but it works with the current
> >> Kubernetes Native implementation easily. It is also theoretically
> possible
> >> to integrate this with other custom Flink metrics but we haven't tested
> yet.
> >>
> >> I have a created a POC pull request that showcases these capabilities:
> >> https://github.com/apache/flink-kubernetes-operator/pull/227
> >> <
> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$
> >
> >>
> >> If you are interested it would be nice if you could check it out and
> >> provide feedback, we will get back to refining this after our current
> >> ongoing release.
> >>
> >> Cheers,
> >> Gyula
> >>
> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <d...@apache.org> wrote:
> >>
> >>> Hi Talat,
> >>>
> >>> This is definitely an interesting and rather complex topic.
> >>>
> >>> Few unstructured thoughts / notes / questions:
> >>>
> >>> - The main struggle has always been that it's hard to come up with a
> >>> generic one-size-fits-it-all metrics for autoscaling.
> >>>   - Flink doesn't have knowledge of the external environment (eg.
> >>> capacity
> >>> planning on the cluster, no notion of pre-emption), so it can not
> really
> >>> make a qualified decision in some cases.
> >>>   - ^ the above goes along the same reasoning as why we don't support
> >>> reactive mode with the session cluster (multi-job scheduling)
> >>> - The re-scaling decision logic most likely needs to be pluggable from
> >>> the
> >>> above reasons
> >>>   - We're in general fairly concerned about running any user code in JM
> >>> for
> >>> stability reasons.
> >>>   - The most flexible option would be allowing to set the desired
> >>> parallelism via rest api and leave the scaling decision to an external
> >>> process, which could be reused for both standalone and "active"
> >>> deployment
> >>> modes (there is actually a prototype by Till, that allows this [1])
> >>>
> >>> How do you intend to make an autoscaling decision? Also note that the
> >>> re-scaling is still a fairly expensive operation (especially with large
> >>> state), so you need to make sure autoscaler doesn't oscillate and
> doesn't
> >>> re-scale too often (this is also something that could vary from
> workload
> >>> to
> >>> workload).
> >>>
> >>> Note on the metrics question with an auto-scaler living in the JM:
> >>> - We shouldn't really collect the metrics into the JM, but instead JM
> can
> >>> pull then from TMs directly on-demand (basically the same thing and
> >>> external auto-scaler would do).
> >>>
> >>> Looking forward to your thoughts
> >>>
> >>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling
> >>> <
> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$
> >
> >>>
> >>> Best,
> >>> D.
> >>>
> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
> >>> tuya...@paloaltonetworks.com>
> >>> wrote:
> >>>
> >>> > Hi,
> >>> > I am working on auto scaling support for native deployments. Today
> >>> Flink
> >>> > provides Reactive mode however it only runs on standalone
> deployments.
> >>> We
> >>> > use Kubernetes native deployment. So I want to increase or decrease
> job
> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are
> very
> >>> > useful to achieve this goal. I started reading code of Flink
> >>> JobManager,
> >>> > AdaptiveScheduler and DeclarativeSlotPool etc.
> >>> >
> >>> > My assumption is Required Resources will be calculated on
> >>> AdaptiveScheduler
> >>> > whenever the scheduler receives a heartbeat from a task manager by
> >>> calling
> >>> > public void updateAccumulators(AccumulatorSnapshot
> accumulatorSnapshot)
> >>> > method.
> >>> >
> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I
> >>> only see
> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you have any
> >>> > suggestions to collect metrics from TaskManagers ? Should I add
> >>> metrics on
> >>> > TaskExecutorToJobManagerHeartbeatPayload ?
> >>> >
> >>> > I am open to another suggestion for this. Whenever I finalize my
> >>> > investigation. I will create a FLIP for more detailed implementation.
> >>> >
> >>> > Thanks for your help in advance.
> >>> > Talat
> >>> >
> >>>
> >>
>

Reply via email to