> > I thought we could enable Adaptive Scheduler, so adding or removing a task > manager is the same as restarting a job when we use an adaptive scheduler. > Do I miss anything ?
It is true for standalone mode since adding/removing a TaskManager pod is fully controlled by users(or external tools). But it is not valid for native K8s integration[1]. Currently, we could not dynamically change the TaskManager pods once the job is running. I really hope the HPA could work for both standalone and native mode. [1]. https://flink.apache.org/2021/02/10/native-k8s-with-ha.html Best, Yang Gyula Fóra <gyula.f...@gmail.com> 于2022年5月30日周一 12:23写道: > Hi Talat! > > Sorry for the late reply, I have been busy with some fixes for the release > and travelling. > > I think the prometheus metrics integration sounds like a great idea that > would cover the needs of most users. > This way users can also integrate easily with the custom Flink metrics too. > > maxReplicas: We could add this easily to the taskManager resource specs > > Nice workflow picture, I would love to include this in the docs later. One > minor comment, should the HPA be outside of the FlinkDeployment box? > > Cheers, > Gyula > > On Wed, May 25, 2022 at 7:50 PM Talat Uyarer <tuya...@paloaltonetworks.com> > wrote: > >> Hi Yang, >> >> I thought we could enable Adaptive Scheduler, so adding or removing a task >> manager is the same as restarting a job when we use an adaptive scheduler. >> Do I miss anything ? >> >> Thanks >> >> On Tue, May 24, 2022 at 8:16 PM Yang Wang <danrtsey...@gmail.com> wrote: >> >> > Thanks for the interesting discussion. >> > >> > Compared with reactive mode, leveraging the flink-kubernetes-operator to >> > do the job restarting/upgrading is another solution for auto-scaling. >> > Given that fully restarting a Flink application on K8s is not too slow, >> > this is a reasonable way. >> > Really hope we could get some progress in such area. >> > >> > Best, >> > Yang >> > >> > Gyula Fóra <gyula.f...@gmail.com> 于2022年5月25日周三 09:04写道: >> > >> >> Hi Talat! >> >> >> >> It would be great to have a HPA that works based on some flink >> >> throughput/backlog metrics. I wonder how you are going to access the >> Flink >> >> metrics in the HPA, we might need some integration with the k8s metrics >> >> system. >> >> In any case whether we need a FLIP or not depends on the complexity, if >> >> it's simple then we can go without a FLIP. >> >> >> >> Cheers, >> >> Gyula >> >> >> >> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer < >> >> tuya...@paloaltonetworks.com> >> >> wrote: >> >> >> >> > Hi Gyula, >> >> > >> >> > This seems very promising for initial scaling. We are using Flink >> >> > Kubernetes Operators. Most probably we are very early adapters for >> it :) >> >> > Let me try it. Get back to you soon. >> >> > >> >> > My plan is building a general purpose CPU and backlog/throughput base >> >> > autoscaling for Flink. I can create a Custom Open Source HPA on top >> of >> >> your >> >> > changes. Do I need to create a FLIP for it ? >> >> > >> >> > Just general information about us Today we use another execution env. >> >> if >> >> > the Job scheduler does not support autoscaling. Having a HPA works if >> >> your >> >> > sources are well balanced. If there is uneven distribution on >> sources, >> >> > Having auto scaling feature on scheduler can help better utilization. >> >> But >> >> > this is not urgent. We can start using your PR at least for a while. >> >> > >> >> > Thanks >> >> > >> >> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gyula.f...@gmail.com> >> >> wrote: >> >> > >> >> >> Hi Talat! >> >> >> >> >> >> One other approach that we are investigating currently is combining >> >> the Flink >> >> >> Kubernetes Operator >> >> >> < >> >> >> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$ >> > >> >> with >> >> >> the K8S scaling capabilities (Horizontal Pod autoscaler) >> >> >> >> >> >> In this approach the HPA monitors the Taskmanager pods directly and >> can >> >> >> modify the FlinkDeployment resource replica number to trigger a >> >> stateful >> >> >> job scale-up/down through the operator. >> >> >> Obviously not as nice as the reactive mode but it works with the >> >> current >> >> >> Kubernetes Native implementation easily. It is also theoretically >> >> possible >> >> >> to integrate this with other custom Flink metrics but we haven't >> >> tested yet. >> >> >> >> >> >> I have a created a POC pull request that showcases these >> capabilities: >> >> >> https://github.com/apache/flink-kubernetes-operator/pull/227 >> >> < >> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6b-_3CvRI$ >> > >> >> >> < >> >> >> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$ >> >> > >> >> >> >> >> >> If you are interested it would be nice if you could check it out and >> >> >> provide feedback, we will get back to refining this after our >> current >> >> >> ongoing release. >> >> >> >> >> >> Cheers, >> >> >> Gyula >> >> >> >> >> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <d...@apache.org> >> >> wrote: >> >> >> >> >> >>> Hi Talat, >> >> >>> >> >> >>> This is definitely an interesting and rather complex topic. >> >> >>> >> >> >>> Few unstructured thoughts / notes / questions: >> >> >>> >> >> >>> - The main struggle has always been that it's hard to come up with >> a >> >> >>> generic one-size-fits-it-all metrics for autoscaling. >> >> >>> - Flink doesn't have knowledge of the external environment (eg. >> >> >>> capacity >> >> >>> planning on the cluster, no notion of pre-emption), so it can not >> >> really >> >> >>> make a qualified decision in some cases. >> >> >>> - ^ the above goes along the same reasoning as why we don't >> support >> >> >>> reactive mode with the session cluster (multi-job scheduling) >> >> >>> - The re-scaling decision logic most likely needs to be pluggable >> from >> >> >>> the >> >> >>> above reasons >> >> >>> - We're in general fairly concerned about running any user code >> in >> >> JM >> >> >>> for >> >> >>> stability reasons. >> >> >>> - The most flexible option would be allowing to set the desired >> >> >>> parallelism via rest api and leave the scaling decision to an >> external >> >> >>> process, which could be reused for both standalone and "active" >> >> >>> deployment >> >> >>> modes (there is actually a prototype by Till, that allows this [1]) >> >> >>> >> >> >>> How do you intend to make an autoscaling decision? Also note that >> the >> >> >>> re-scaling is still a fairly expensive operation (especially with >> >> large >> >> >>> state), so you need to make sure autoscaler doesn't oscillate and >> >> doesn't >> >> >>> re-scale too often (this is also something that could vary from >> >> workload >> >> >>> to >> >> >>> workload). >> >> >>> >> >> >>> Note on the metrics question with an auto-scaler living in the JM: >> >> >>> - We shouldn't really collect the metrics into the JM, but instead >> JM >> >> can >> >> >>> pull then from TMs directly on-demand (basically the same thing and >> >> >>> external auto-scaler would do). >> >> >>> >> >> >>> Looking forward to your thoughts >> >> >>> >> >> >>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling >> >> < >> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6bc0Id0Zg$ >> > >> >> >>> < >> >> >> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$ >> >> > >> >> >>> >> >> >>> Best, >> >> >>> D. >> >> >>> >> >> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer < >> >> >>> tuya...@paloaltonetworks.com> >> >> >>> wrote: >> >> >>> >> >> >>> > Hi, >> >> >>> > I am working on auto scaling support for native deployments. >> Today >> >> >>> Flink >> >> >>> > provides Reactive mode however it only runs on standalone >> >> deployments. >> >> >>> We >> >> >>> > use Kubernetes native deployment. So I want to increase or >> decrease >> >> job >> >> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are >> >> very >> >> >>> > useful to achieve this goal. I started reading code of Flink >> >> >>> JobManager, >> >> >>> > AdaptiveScheduler and DeclarativeSlotPool etc. >> >> >>> > >> >> >>> > My assumption is Required Resources will be calculated on >> >> >>> AdaptiveScheduler >> >> >>> > whenever the scheduler receives a heartbeat from a task manager >> by >> >> >>> calling >> >> >>> > public void updateAccumulators(AccumulatorSnapshot >> >> accumulatorSnapshot) >> >> >>> > method. >> >> >>> > >> >> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however >> I >> >> >>> only see >> >> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you have >> >> any >> >> >>> > suggestions to collect metrics from TaskManagers ? Should I add >> >> >>> metrics on >> >> >>> > TaskExecutorToJobManagerHeartbeatPayload ? >> >> >>> > >> >> >>> > I am open to another suggestion for this. Whenever I finalize my >> >> >>> > investigation. I will create a FLIP for more detailed >> >> implementation. >> >> >>> > >> >> >>> > Thanks for your help in advance. >> >> >>> > Talat >> >> >>> > >> >> >>> >> >> >> >> >> >> > >> >