Hi Talat! Sorry for the late reply, I have been busy with some fixes for the release and travelling.
I think the prometheus metrics integration sounds like a great idea that would cover the needs of most users. This way users can also integrate easily with the custom Flink metrics too. maxReplicas: We could add this easily to the taskManager resource specs Nice workflow picture, I would love to include this in the docs later. One minor comment, should the HPA be outside of the FlinkDeployment box? Cheers, Gyula On Wed, May 25, 2022 at 7:50 PM Talat Uyarer <tuya...@paloaltonetworks.com> wrote: > Hi Yang, > > I thought we could enable Adaptive Scheduler, so adding or removing a task > manager is the same as restarting a job when we use an adaptive scheduler. > Do I miss anything ? > > Thanks > > On Tue, May 24, 2022 at 8:16 PM Yang Wang <danrtsey...@gmail.com> wrote: > > > Thanks for the interesting discussion. > > > > Compared with reactive mode, leveraging the flink-kubernetes-operator to > > do the job restarting/upgrading is another solution for auto-scaling. > > Given that fully restarting a Flink application on K8s is not too slow, > > this is a reasonable way. > > Really hope we could get some progress in such area. > > > > Best, > > Yang > > > > Gyula Fóra <gyula.f...@gmail.com> 于2022年5月25日周三 09:04写道: > > > >> Hi Talat! > >> > >> It would be great to have a HPA that works based on some flink > >> throughput/backlog metrics. I wonder how you are going to access the > Flink > >> metrics in the HPA, we might need some integration with the k8s metrics > >> system. > >> In any case whether we need a FLIP or not depends on the complexity, if > >> it's simple then we can go without a FLIP. > >> > >> Cheers, > >> Gyula > >> > >> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer < > >> tuya...@paloaltonetworks.com> > >> wrote: > >> > >> > Hi Gyula, > >> > > >> > This seems very promising for initial scaling. We are using Flink > >> > Kubernetes Operators. Most probably we are very early adapters for it > :) > >> > Let me try it. Get back to you soon. > >> > > >> > My plan is building a general purpose CPU and backlog/throughput base > >> > autoscaling for Flink. I can create a Custom Open Source HPA on top of > >> your > >> > changes. Do I need to create a FLIP for it ? > >> > > >> > Just general information about us Today we use another execution env. > >> if > >> > the Job scheduler does not support autoscaling. Having a HPA works if > >> your > >> > sources are well balanced. If there is uneven distribution on sources, > >> > Having auto scaling feature on scheduler can help better utilization. > >> But > >> > this is not urgent. We can start using your PR at least for a while. > >> > > >> > Thanks > >> > > >> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gyula.f...@gmail.com> > >> wrote: > >> > > >> >> Hi Talat! > >> >> > >> >> One other approach that we are investigating currently is combining > >> the Flink > >> >> Kubernetes Operator > >> >> < > >> > https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$ > > > >> with > >> >> the K8S scaling capabilities (Horizontal Pod autoscaler) > >> >> > >> >> In this approach the HPA monitors the Taskmanager pods directly and > can > >> >> modify the FlinkDeployment resource replica number to trigger a > >> stateful > >> >> job scale-up/down through the operator. > >> >> Obviously not as nice as the reactive mode but it works with the > >> current > >> >> Kubernetes Native implementation easily. It is also theoretically > >> possible > >> >> to integrate this with other custom Flink metrics but we haven't > >> tested yet. > >> >> > >> >> I have a created a POC pull request that showcases these > capabilities: > >> >> https://github.com/apache/flink-kubernetes-operator/pull/227 > >> < > https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6b-_3CvRI$ > > > >> >> < > >> > https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$ > >> > > >> >> > >> >> If you are interested it would be nice if you could check it out and > >> >> provide feedback, we will get back to refining this after our current > >> >> ongoing release. > >> >> > >> >> Cheers, > >> >> Gyula > >> >> > >> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <d...@apache.org> > >> wrote: > >> >> > >> >>> Hi Talat, > >> >>> > >> >>> This is definitely an interesting and rather complex topic. > >> >>> > >> >>> Few unstructured thoughts / notes / questions: > >> >>> > >> >>> - The main struggle has always been that it's hard to come up with a > >> >>> generic one-size-fits-it-all metrics for autoscaling. > >> >>> - Flink doesn't have knowledge of the external environment (eg. > >> >>> capacity > >> >>> planning on the cluster, no notion of pre-emption), so it can not > >> really > >> >>> make a qualified decision in some cases. > >> >>> - ^ the above goes along the same reasoning as why we don't > support > >> >>> reactive mode with the session cluster (multi-job scheduling) > >> >>> - The re-scaling decision logic most likely needs to be pluggable > from > >> >>> the > >> >>> above reasons > >> >>> - We're in general fairly concerned about running any user code in > >> JM > >> >>> for > >> >>> stability reasons. > >> >>> - The most flexible option would be allowing to set the desired > >> >>> parallelism via rest api and leave the scaling decision to an > external > >> >>> process, which could be reused for both standalone and "active" > >> >>> deployment > >> >>> modes (there is actually a prototype by Till, that allows this [1]) > >> >>> > >> >>> How do you intend to make an autoscaling decision? Also note that > the > >> >>> re-scaling is still a fairly expensive operation (especially with > >> large > >> >>> state), so you need to make sure autoscaler doesn't oscillate and > >> doesn't > >> >>> re-scale too often (this is also something that could vary from > >> workload > >> >>> to > >> >>> workload). > >> >>> > >> >>> Note on the metrics question with an auto-scaler living in the JM: > >> >>> - We shouldn't really collect the metrics into the JM, but instead > JM > >> can > >> >>> pull then from TMs directly on-demand (basically the same thing and > >> >>> external auto-scaler would do). > >> >>> > >> >>> Looking forward to your thoughts > >> >>> > >> >>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling > >> < > https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6bc0Id0Zg$ > > > >> >>> < > >> > https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$ > >> > > >> >>> > >> >>> Best, > >> >>> D. > >> >>> > >> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer < > >> >>> tuya...@paloaltonetworks.com> > >> >>> wrote: > >> >>> > >> >>> > Hi, > >> >>> > I am working on auto scaling support for native deployments. Today > >> >>> Flink > >> >>> > provides Reactive mode however it only runs on standalone > >> deployments. > >> >>> We > >> >>> > use Kubernetes native deployment. So I want to increase or > decrease > >> job > >> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are > >> very > >> >>> > useful to achieve this goal. I started reading code of Flink > >> >>> JobManager, > >> >>> > AdaptiveScheduler and DeclarativeSlotPool etc. > >> >>> > > >> >>> > My assumption is Required Resources will be calculated on > >> >>> AdaptiveScheduler > >> >>> > whenever the scheduler receives a heartbeat from a task manager by > >> >>> calling > >> >>> > public void updateAccumulators(AccumulatorSnapshot > >> accumulatorSnapshot) > >> >>> > method. > >> >>> > > >> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I > >> >>> only see > >> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you have > >> any > >> >>> > suggestions to collect metrics from TaskManagers ? Should I add > >> >>> metrics on > >> >>> > TaskExecutorToJobManagerHeartbeatPayload ? > >> >>> > > >> >>> > I am open to another suggestion for this. Whenever I finalize my > >> >>> > investigation. I will create a FLIP for more detailed > >> implementation. > >> >>> > > >> >>> > Thanks for your help in advance. > >> >>> > Talat > >> >>> > > >> >>> > >> >> > >> > > >