Hello David, Thanks for the detailed explanation. This is really helpful!
I also had an offline discussion with Yang Wang. He also told me this could be done in the future, but not part of the recent plan. As suggested, I think I can do the follow things to achieve some auto-scaling component in Flink operator. 1. Scrape metrics from Kubernetes (POD CPU usage/Memory usage/…) and Flink metrics system (For example, leveraging Prometheus metric exporter[1] in Flink) 2. With metrics in step(1), implement logic in operator to support customized scaling policy based on user definition and requirements. All these things should be configurable by FlinkApplication CRD. 3. Similar to Reactive mode, checkpoint must be enabled to support such functionality. When a certain criteria is meet, Flink operator will automatically triggered a stop command without manual intervention (checkpoint is retained) and bring up the Flink application from the last checkpoint with the new configurations it calculates based on policy. I believe this is similar to failover process, if the application guarantees EXACTLY_ONCE semantic [2], I think the restarted application should still follow the exactly once semantic. In general, I think it should work. If you think there is any issues with this proposal, please suggest in this thread. Thanks! Reference: [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheus [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing Best, Fuyao From: David Morávek <d...@apache.org> Date: Friday, October 29, 2021 at 23:11 To: Fuyao Li <fuyao...@oracle.com> Cc: user <user@flink.apache.org>, Yang Wang <danrtsey...@gmail.com>, Robert Metzger <rmetz...@apache.org>, tonysong...@gmail.com <tonysong...@gmail.com> Subject: [External] : Re: Possibility of supporting Reactive mode for native Kubernetes application mode Hi Fuyao, this is a great question ;) 1) First let's be clear on what the reactive mode actually is. Reactive Mode is related to how the Flink makes use of the newly available resources. It greedily uses all of the resources that are available in your Flink cluster (if new task manager joins in, it re-scales). You as a Flink operator are responsible for adding / removing the task managers. This can be done manually or by some kind of horizontal auto-scaler (from what I understand this is what you want to achieve). 2) K8s native integration leverages an active resource management. The standalone deployment mode uses the passive resource management, which means that YOU are responsible for the resource allocation (manually starting up processes). K8s native integration on the other hand allocates resources for you based on the static configuration it needs. Right now there is no mechanism that would be able to make a decision to add / remove task managers (eg. based on some metric). -> This is most likely the missing part. 3) Summing up the above... In theory "reactive mode" could be implemented for the application mode with active resource management (k8s native / yarn), but this would either require Flink to provide a generic auto-scaler component that would be embedded within the JobMaster or letting user to provide his own. This would require some design discussion as the ideas we have about this are still on the "theoretical level". Even though this may be a future development, there are currently no plans on doing that. Main focus in this area is now on making the reactive mode / adaptive scheduler production ready (user metrics, fixing the UI, faster TM loss / disconnect detection, local recovery) and speeding up the Flink recovery mechanism so the re-scaling experience is much smoother. Best, D. On Wed, Oct 27, 2021 at 11:57 PM Fuyao Li <fuyao...@oracle.com<mailto:fuyao...@oracle.com>> wrote: Hello Community, I am checking the reactive mode for Flink deployment. I noticed that this is supported in Kubernetes environment, but only for standalone Kubernetes as of now. I have read some previous discussion threads regarding this issue. See [1][2][3][4][5][6]. Question 1: It seems that due to some interface and design considerations [4] mentioned by Robert and Xintong and official doc[5], this feature is only for standalone k8s and it is not available for native Kubernetes now. However, I believe in theory, it is possible to be added to native Kubernetes, right? Will this be part of the future plan? If not, what is the restriction and is it a hard restriction? Question 2: I have built an native Kubernetes operator on top of Yang’s work [7] supporting various state transfers in native k8s application mode and session mode. Right now, I am seeking for adding some similar features like reactive scaling for native k8s. From my perspective, what I can do is to enable periodic savepoints and scale up/down based certain metrics we collect inside the Flink application. Some additional resource considerations need to be added to implement such feature, similar to the adaptive scheduler concept in [9][10] (I didn’t dive deep into that, I guess I just need to calculated the new TMs will be offered with sufficient k8s resources if the rescale happens?) I think as a user/operator, I am not supposed by to be able to recover/restarted a job from checkpoint [8]. I guess this might cause some performance loss since savepoints are more expensive and the Flink application must do both savepoint and checkpoint periodically… Is there any possible ways that user can also use checkpoints to restart and recover as a user? If Question 1 will be part of the future plan, I guess I won’t need much work here. Reference: [1] Reactive mode blog: https://flink.apache.org/2021/05/06/reactive-mode.html<https://urldefense.com/v3/__https:/flink.apache.org/2021/05/06/reactive-mode.html__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVKg4EB28$> [2] example usage of reactive scaling: https://github.com/rmetzger/flink-reactive-mode-k8s-demo<https://urldefense.com/v3/__https:/github.com/rmetzger/flink-reactive-mode-k8s-demo__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVGe0viD4$> [3] FILP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-159*3A*Reactive*Mode__;JSsr!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVreFl2sk$> [4] Discussion thread: https://lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374%40%3Cdev.flink.apache.org%3E<https://urldefense.com/v3/__https:/lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374*40*3Cdev.flink.apache.org*3E__;JSUl!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVxAq0hRQ$> [5] Flink doc: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVUa59UtI$> [6] Flink Jira: https://issues.apache.org/jira/browse/FLINK-10407\<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/FLINK-10407/__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVzBYJfLg$> [7] https://github.com/wangyang0918/flink-native-k8s-operator<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVjPq5d6w$> [8] https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#difference-to-savepoints<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/*difference-to-savepoints__;Iw!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVl2IgA5g$> [9] https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-138*3A*Declarative*Resource*management__;JSsrKw!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVvZb5u0I$> [10] https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-160*3A*Adaptive*Scheduler__;JSsr!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVSP5Qkkk$> Thanks, Fuyao