Hi Gyula, This seems very promising for initial scaling. We are using Flink Kubernetes Operators. Most probably we are very early adapters for it :) Let me try it. Get back to you soon.
My plan is building a general purpose CPU and backlog/throughput base autoscaling for Flink. I can create a Custom Open Source HPA on top of your changes. Do I need to create a FLIP for it ? Just general information about us Today we use another execution env. if the Job scheduler does not support autoscaling. Having a HPA works if your sources are well balanced. If there is uneven distribution on sources, Having auto scaling feature on scheduler can help better utilization. But this is not urgent. We can start using your PR at least for a while. Thanks On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi Talat! > > One other approach that we are investigating currently is combining the Flink > Kubernetes Operator > <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$> > with > the K8S scaling capabilities (Horizontal Pod autoscaler) > > In this approach the HPA monitors the Taskmanager pods directly and can > modify the FlinkDeployment resource replica number to trigger a stateful > job scale-up/down through the operator. > Obviously not as nice as the reactive mode but it works with the current > Kubernetes Native implementation easily. It is also theoretically possible > to integrate this with other custom Flink metrics but we haven't tested yet. > > I have a created a POC pull request that showcases these capabilities: > https://github.com/apache/flink-kubernetes-operator/pull/227 > <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$> > > If you are interested it would be nice if you could check it out and > provide feedback, we will get back to refining this after our current > ongoing release. > > Cheers, > Gyula > > On Mon, May 23, 2022 at 12:23 AM David Morávek <d...@apache.org> wrote: > >> Hi Talat, >> >> This is definitely an interesting and rather complex topic. >> >> Few unstructured thoughts / notes / questions: >> >> - The main struggle has always been that it's hard to come up with a >> generic one-size-fits-it-all metrics for autoscaling. >> - Flink doesn't have knowledge of the external environment (eg. capacity >> planning on the cluster, no notion of pre-emption), so it can not really >> make a qualified decision in some cases. >> - ^ the above goes along the same reasoning as why we don't support >> reactive mode with the session cluster (multi-job scheduling) >> - The re-scaling decision logic most likely needs to be pluggable from the >> above reasons >> - We're in general fairly concerned about running any user code in JM >> for >> stability reasons. >> - The most flexible option would be allowing to set the desired >> parallelism via rest api and leave the scaling decision to an external >> process, which could be reused for both standalone and "active" deployment >> modes (there is actually a prototype by Till, that allows this [1]) >> >> How do you intend to make an autoscaling decision? Also note that the >> re-scaling is still a fairly expensive operation (especially with large >> state), so you need to make sure autoscaler doesn't oscillate and doesn't >> re-scale too often (this is also something that could vary from workload >> to >> workload). >> >> Note on the metrics question with an auto-scaler living in the JM: >> - We shouldn't really collect the metrics into the JM, but instead JM can >> pull then from TMs directly on-demand (basically the same thing and >> external auto-scaler would do). >> >> Looking forward to your thoughts >> >> [1] https://github.com/tillrohrmann/flink/commits/autoscaling >> <https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$> >> >> Best, >> D. >> >> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer < >> tuya...@paloaltonetworks.com> >> wrote: >> >> > Hi, >> > I am working on auto scaling support for native deployments. Today Flink >> > provides Reactive mode however it only runs on standalone deployments. >> We >> > use Kubernetes native deployment. So I want to increase or decrease job >> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are very >> > useful to achieve this goal. I started reading code of Flink JobManager, >> > AdaptiveScheduler and DeclarativeSlotPool etc. >> > >> > My assumption is Required Resources will be calculated on >> AdaptiveScheduler >> > whenever the scheduler receives a heartbeat from a task manager by >> calling >> > public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) >> > method. >> > >> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I only >> see >> > *accumulatorReport* and *executionDeploymentReport* . Do you have any >> > suggestions to collect metrics from TaskManagers ? Should I add metrics >> on >> > TaskExecutorToJobManagerHeartbeatPayload ? >> > >> > I am open to another suggestion for this. Whenever I finalize my >> > investigation. I will create a FLIP for more detailed implementation. >> > >> > Thanks for your help in advance. >> > Talat >> > >> >