Thanks for the interesting discussion. Compared with reactive mode, leveraging the flink-kubernetes-operator to do the job restarting/upgrading is another solution for auto-scaling. Given that fully restarting a Flink application on K8s is not too slow, this is a reasonable way. Really hope we could get some progress in such area.
Best, Yang Gyula Fóra <gyula.f...@gmail.com> 于2022年5月25日周三 09:04写道: > Hi Talat! > > It would be great to have a HPA that works based on some flink > throughput/backlog metrics. I wonder how you are going to access the Flink > metrics in the HPA, we might need some integration with the k8s metrics > system. > In any case whether we need a FLIP or not depends on the complexity, if > it's simple then we can go without a FLIP. > > Cheers, > Gyula > > On Tue, May 24, 2022 at 12:26 PM Talat Uyarer < > tuya...@paloaltonetworks.com> > wrote: > > > Hi Gyula, > > > > This seems very promising for initial scaling. We are using Flink > > Kubernetes Operators. Most probably we are very early adapters for it :) > > Let me try it. Get back to you soon. > > > > My plan is building a general purpose CPU and backlog/throughput base > > autoscaling for Flink. I can create a Custom Open Source HPA on top of > your > > changes. Do I need to create a FLIP for it ? > > > > Just general information about us Today we use another execution env. if > > the Job scheduler does not support autoscaling. Having a HPA works if > your > > sources are well balanced. If there is uneven distribution on sources, > > Having auto scaling feature on scheduler can help better utilization. But > > this is not urgent. We can start using your PR at least for a while. > > > > Thanks > > > > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > > > >> Hi Talat! > >> > >> One other approach that we are investigating currently is combining the > Flink > >> Kubernetes Operator > >> < > https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$> > with > >> the K8S scaling capabilities (Horizontal Pod autoscaler) > >> > >> In this approach the HPA monitors the Taskmanager pods directly and can > >> modify the FlinkDeployment resource replica number to trigger a stateful > >> job scale-up/down through the operator. > >> Obviously not as nice as the reactive mode but it works with the current > >> Kubernetes Native implementation easily. It is also theoretically > possible > >> to integrate this with other custom Flink metrics but we haven't tested > yet. > >> > >> I have a created a POC pull request that showcases these capabilities: > >> https://github.com/apache/flink-kubernetes-operator/pull/227 > >> < > https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$ > > > >> > >> If you are interested it would be nice if you could check it out and > >> provide feedback, we will get back to refining this after our current > >> ongoing release. > >> > >> Cheers, > >> Gyula > >> > >> On Mon, May 23, 2022 at 12:23 AM David Morávek <d...@apache.org> wrote: > >> > >>> Hi Talat, > >>> > >>> This is definitely an interesting and rather complex topic. > >>> > >>> Few unstructured thoughts / notes / questions: > >>> > >>> - The main struggle has always been that it's hard to come up with a > >>> generic one-size-fits-it-all metrics for autoscaling. > >>> - Flink doesn't have knowledge of the external environment (eg. > >>> capacity > >>> planning on the cluster, no notion of pre-emption), so it can not > really > >>> make a qualified decision in some cases. > >>> - ^ the above goes along the same reasoning as why we don't support > >>> reactive mode with the session cluster (multi-job scheduling) > >>> - The re-scaling decision logic most likely needs to be pluggable from > >>> the > >>> above reasons > >>> - We're in general fairly concerned about running any user code in JM > >>> for > >>> stability reasons. > >>> - The most flexible option would be allowing to set the desired > >>> parallelism via rest api and leave the scaling decision to an external > >>> process, which could be reused for both standalone and "active" > >>> deployment > >>> modes (there is actually a prototype by Till, that allows this [1]) > >>> > >>> How do you intend to make an autoscaling decision? Also note that the > >>> re-scaling is still a fairly expensive operation (especially with large > >>> state), so you need to make sure autoscaler doesn't oscillate and > doesn't > >>> re-scale too often (this is also something that could vary from > workload > >>> to > >>> workload). > >>> > >>> Note on the metrics question with an auto-scaler living in the JM: > >>> - We shouldn't really collect the metrics into the JM, but instead JM > can > >>> pull then from TMs directly on-demand (basically the same thing and > >>> external auto-scaler would do). > >>> > >>> Looking forward to your thoughts > >>> > >>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling > >>> < > https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$ > > > >>> > >>> Best, > >>> D. > >>> > >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer < > >>> tuya...@paloaltonetworks.com> > >>> wrote: > >>> > >>> > Hi, > >>> > I am working on auto scaling support for native deployments. Today > >>> Flink > >>> > provides Reactive mode however it only runs on standalone > deployments. > >>> We > >>> > use Kubernetes native deployment. So I want to increase or decrease > job > >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are > very > >>> > useful to achieve this goal. I started reading code of Flink > >>> JobManager, > >>> > AdaptiveScheduler and DeclarativeSlotPool etc. > >>> > > >>> > My assumption is Required Resources will be calculated on > >>> AdaptiveScheduler > >>> > whenever the scheduler receives a heartbeat from a task manager by > >>> calling > >>> > public void updateAccumulators(AccumulatorSnapshot > accumulatorSnapshot) > >>> > method. > >>> > > >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I > >>> only see > >>> > *accumulatorReport* and *executionDeploymentReport* . Do you have any > >>> > suggestions to collect metrics from TaskManagers ? Should I add > >>> metrics on > >>> > TaskExecutorToJobManagerHeartbeatPayload ? > >>> > > >>> > I am open to another suggestion for this. Whenever I finalize my > >>> > investigation. I will create a FLIP for more detailed implementation. > >>> > > >>> > Thanks for your help in advance. > >>> > Talat > >>> > > >>> > >> >