Hi Gyula, I did some investigation. Kubernetes developers suggest not using the kubernetes metric system for application specific metrics. [1] Currently only possible workflow is over prometheus. Prometheus is used widely on Kubernetes deployments. Kubernetes metrics provides Custom Metrics API, each task manager exposes its own metrics and HPA uses those metrics to make autoscaling decisions. Google Dataflow uses Source metrics [2] We need to get maxReplica count for a Flink Job. I drow in general workflow in here [3]
What are your thoughts on this? Talat [1] https://github.com/kubernetes-sigs/metrics-server#use-cases [2] https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#custom-unbounded-sources [3] https://docs.google.com/drawings/d/1m_SjabXN-EX-0R9zbbS9cXxH3JV2r257K6aRG3Zyd4Q/edit?usp=sharing On Tue, May 24, 2022 at 6:02 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi Talat! > > It would be great to have a HPA that works based on some flink > throughput/backlog metrics. I wonder how you are going to access the Flink > metrics in the HPA, we might need some integration with the k8s metrics > system. > In any case whether we need a FLIP or not depends on the complexity, if > it's simple then we can go without a FLIP. > > Cheers, > Gyula > > On Tue, May 24, 2022 at 12:26 PM Talat Uyarer < > tuya...@paloaltonetworks.com> wrote: > >> Hi Gyula, >> >> This seems very promising for initial scaling. We are using Flink >> Kubernetes Operators. Most probably we are very early adapters for it :) >> Let me try it. Get back to you soon. >> >> My plan is building a general purpose CPU and backlog/throughput base >> autoscaling for Flink. I can create a Custom Open Source HPA on top of your >> changes. Do I need to create a FLIP for it ? >> >> Just general information about us Today we use another execution env. if >> the Job scheduler does not support autoscaling. Having a HPA works if your >> sources are well balanced. If there is uneven distribution on sources, >> Having auto scaling feature on scheduler can help better utilization. But >> this is not urgent. We can start using your PR at least for a while. >> >> Thanks >> >> On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gyula.f...@gmail.com> wrote: >> >>> Hi Talat! >>> >>> One other approach that we are investigating currently is combining the >>> Flink >>> Kubernetes Operator >>> <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$> >>> with >>> the K8S scaling capabilities (Horizontal Pod autoscaler) >>> >>> In this approach the HPA monitors the Taskmanager pods directly and can >>> modify the FlinkDeployment resource replica number to trigger a stateful >>> job scale-up/down through the operator. >>> Obviously not as nice as the reactive mode but it works with the current >>> Kubernetes Native implementation easily. It is also theoretically possible >>> to integrate this with other custom Flink metrics but we haven't tested yet. >>> >>> I have a created a POC pull request that showcases these capabilities: >>> https://github.com/apache/flink-kubernetes-operator/pull/227 >>> <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$> >>> >>> If you are interested it would be nice if you could check it out and >>> provide feedback, we will get back to refining this after our current >>> ongoing release. >>> >>> Cheers, >>> Gyula >>> >>> On Mon, May 23, 2022 at 12:23 AM David Morávek <d...@apache.org> wrote: >>> >>>> Hi Talat, >>>> >>>> This is definitely an interesting and rather complex topic. >>>> >>>> Few unstructured thoughts / notes / questions: >>>> >>>> - The main struggle has always been that it's hard to come up with a >>>> generic one-size-fits-it-all metrics for autoscaling. >>>> - Flink doesn't have knowledge of the external environment (eg. >>>> capacity >>>> planning on the cluster, no notion of pre-emption), so it can not really >>>> make a qualified decision in some cases. >>>> - ^ the above goes along the same reasoning as why we don't support >>>> reactive mode with the session cluster (multi-job scheduling) >>>> - The re-scaling decision logic most likely needs to be pluggable from >>>> the >>>> above reasons >>>> - We're in general fairly concerned about running any user code in JM >>>> for >>>> stability reasons. >>>> - The most flexible option would be allowing to set the desired >>>> parallelism via rest api and leave the scaling decision to an external >>>> process, which could be reused for both standalone and "active" >>>> deployment >>>> modes (there is actually a prototype by Till, that allows this [1]) >>>> >>>> How do you intend to make an autoscaling decision? Also note that the >>>> re-scaling is still a fairly expensive operation (especially with large >>>> state), so you need to make sure autoscaler doesn't oscillate and >>>> doesn't >>>> re-scale too often (this is also something that could vary from >>>> workload to >>>> workload). >>>> >>>> Note on the metrics question with an auto-scaler living in the JM: >>>> - We shouldn't really collect the metrics into the JM, but instead JM >>>> can >>>> pull then from TMs directly on-demand (basically the same thing and >>>> external auto-scaler would do). >>>> >>>> Looking forward to your thoughts >>>> >>>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling >>>> <https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$> >>>> >>>> Best, >>>> D. >>>> >>>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer < >>>> tuya...@paloaltonetworks.com> >>>> wrote: >>>> >>>> > Hi, >>>> > I am working on auto scaling support for native deployments. Today >>>> Flink >>>> > provides Reactive mode however it only runs on standalone >>>> deployments. We >>>> > use Kubernetes native deployment. So I want to increase or decrease >>>> job >>>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are very >>>> > useful to achieve this goal. I started reading code of Flink >>>> JobManager, >>>> > AdaptiveScheduler and DeclarativeSlotPool etc. >>>> > >>>> > My assumption is Required Resources will be calculated on >>>> AdaptiveScheduler >>>> > whenever the scheduler receives a heartbeat from a task manager by >>>> calling >>>> > public void updateAccumulators(AccumulatorSnapshot >>>> accumulatorSnapshot) >>>> > method. >>>> > >>>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I >>>> only see >>>> > *accumulatorReport* and *executionDeploymentReport* . Do you have any >>>> > suggestions to collect metrics from TaskManagers ? Should I add >>>> metrics on >>>> > TaskExecutorToJobManagerHeartbeatPayload ? >>>> > >>>> > I am open to another suggestion for this. Whenever I finalize my >>>> > investigation. I will create a FLIP for more detailed implementation. >>>> > >>>> > Thanks for your help in advance. >>>> > Talat >>>> > >>>> >>>