Hi Yang, I thought we could enable Adaptive Scheduler, so adding or removing a task manager is the same as restarting a job when we use an adaptive scheduler. Do I miss anything ?
Thanks On Tue, May 24, 2022 at 8:16 PM Yang Wang <danrtsey...@gmail.com> wrote: > Thanks for the interesting discussion. > > Compared with reactive mode, leveraging the flink-kubernetes-operator to > do the job restarting/upgrading is another solution for auto-scaling. > Given that fully restarting a Flink application on K8s is not too slow, > this is a reasonable way. > Really hope we could get some progress in such area. > > Best, > Yang > > Gyula Fóra <gyula.f...@gmail.com> 于2022年5月25日周三 09:04写道: > >> Hi Talat! >> >> It would be great to have a HPA that works based on some flink >> throughput/backlog metrics. I wonder how you are going to access the Flink >> metrics in the HPA, we might need some integration with the k8s metrics >> system. >> In any case whether we need a FLIP or not depends on the complexity, if >> it's simple then we can go without a FLIP. >> >> Cheers, >> Gyula >> >> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer < >> tuya...@paloaltonetworks.com> >> wrote: >> >> > Hi Gyula, >> > >> > This seems very promising for initial scaling. We are using Flink >> > Kubernetes Operators. Most probably we are very early adapters for it :) >> > Let me try it. Get back to you soon. >> > >> > My plan is building a general purpose CPU and backlog/throughput base >> > autoscaling for Flink. I can create a Custom Open Source HPA on top of >> your >> > changes. Do I need to create a FLIP for it ? >> > >> > Just general information about us Today we use another execution env. >> if >> > the Job scheduler does not support autoscaling. Having a HPA works if >> your >> > sources are well balanced. If there is uneven distribution on sources, >> > Having auto scaling feature on scheduler can help better utilization. >> But >> > this is not urgent. We can start using your PR at least for a while. >> > >> > Thanks >> > >> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gyula.f...@gmail.com> >> wrote: >> > >> >> Hi Talat! >> >> >> >> One other approach that we are investigating currently is combining >> the Flink >> >> Kubernetes Operator >> >> < >> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$> >> with >> >> the K8S scaling capabilities (Horizontal Pod autoscaler) >> >> >> >> In this approach the HPA monitors the Taskmanager pods directly and can >> >> modify the FlinkDeployment resource replica number to trigger a >> stateful >> >> job scale-up/down through the operator. >> >> Obviously not as nice as the reactive mode but it works with the >> current >> >> Kubernetes Native implementation easily. It is also theoretically >> possible >> >> to integrate this with other custom Flink metrics but we haven't >> tested yet. >> >> >> >> I have a created a POC pull request that showcases these capabilities: >> >> https://github.com/apache/flink-kubernetes-operator/pull/227 >> <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6b-_3CvRI$> >> >> < >> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$ >> > >> >> >> >> If you are interested it would be nice if you could check it out and >> >> provide feedback, we will get back to refining this after our current >> >> ongoing release. >> >> >> >> Cheers, >> >> Gyula >> >> >> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <d...@apache.org> >> wrote: >> >> >> >>> Hi Talat, >> >>> >> >>> This is definitely an interesting and rather complex topic. >> >>> >> >>> Few unstructured thoughts / notes / questions: >> >>> >> >>> - The main struggle has always been that it's hard to come up with a >> >>> generic one-size-fits-it-all metrics for autoscaling. >> >>> - Flink doesn't have knowledge of the external environment (eg. >> >>> capacity >> >>> planning on the cluster, no notion of pre-emption), so it can not >> really >> >>> make a qualified decision in some cases. >> >>> - ^ the above goes along the same reasoning as why we don't support >> >>> reactive mode with the session cluster (multi-job scheduling) >> >>> - The re-scaling decision logic most likely needs to be pluggable from >> >>> the >> >>> above reasons >> >>> - We're in general fairly concerned about running any user code in >> JM >> >>> for >> >>> stability reasons. >> >>> - The most flexible option would be allowing to set the desired >> >>> parallelism via rest api and leave the scaling decision to an external >> >>> process, which could be reused for both standalone and "active" >> >>> deployment >> >>> modes (there is actually a prototype by Till, that allows this [1]) >> >>> >> >>> How do you intend to make an autoscaling decision? Also note that the >> >>> re-scaling is still a fairly expensive operation (especially with >> large >> >>> state), so you need to make sure autoscaler doesn't oscillate and >> doesn't >> >>> re-scale too often (this is also something that could vary from >> workload >> >>> to >> >>> workload). >> >>> >> >>> Note on the metrics question with an auto-scaler living in the JM: >> >>> - We shouldn't really collect the metrics into the JM, but instead JM >> can >> >>> pull then from TMs directly on-demand (basically the same thing and >> >>> external auto-scaler would do). >> >>> >> >>> Looking forward to your thoughts >> >>> >> >>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling >> <https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6bc0Id0Zg$> >> >>> < >> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$ >> > >> >>> >> >>> Best, >> >>> D. >> >>> >> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer < >> >>> tuya...@paloaltonetworks.com> >> >>> wrote: >> >>> >> >>> > Hi, >> >>> > I am working on auto scaling support for native deployments. Today >> >>> Flink >> >>> > provides Reactive mode however it only runs on standalone >> deployments. >> >>> We >> >>> > use Kubernetes native deployment. So I want to increase or decrease >> job >> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are >> very >> >>> > useful to achieve this goal. I started reading code of Flink >> >>> JobManager, >> >>> > AdaptiveScheduler and DeclarativeSlotPool etc. >> >>> > >> >>> > My assumption is Required Resources will be calculated on >> >>> AdaptiveScheduler >> >>> > whenever the scheduler receives a heartbeat from a task manager by >> >>> calling >> >>> > public void updateAccumulators(AccumulatorSnapshot >> accumulatorSnapshot) >> >>> > method. >> >>> > >> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I >> >>> only see >> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you have >> any >> >>> > suggestions to collect metrics from TaskManagers ? Should I add >> >>> metrics on >> >>> > TaskExecutorToJobManagerHeartbeatPayload ? >> >>> > >> >>> > I am open to another suggestion for this. Whenever I finalize my >> >>> > investigation. I will create a FLIP for more detailed >> implementation. >> >>> > >> >>> > Thanks for your help in advance. >> >>> > Talat >> >>> > >> >>> >> >> >> >