Hello David,

Thanks for the detailed explanation. This is really helpful!

I also had an offline discussion with Yang Wang. He also told me this could be 
done in the future, but not part of the recent plan.

As suggested, I think I can do the follow things to achieve some auto-scaling 
component in Flink operator.


  1.  Scrape metrics from Kubernetes (POD CPU usage/Memory usage/…) and Flink 
metrics system (For example, leveraging Prometheus metric exporter[1] in Flink)
  2.  With metrics in step(1), implement logic in operator to support 
customized scaling policy based on user definition and requirements. All these 
things should be configurable by FlinkApplication CRD.
  3.  Similar to Reactive mode, checkpoint must be enabled to support such 
functionality. When a certain criteria is meet, Flink operator will 
automatically triggered a stop command without manual intervention (checkpoint 
is retained) and bring up the Flink application from the last checkpoint with 
the new configurations it calculates based on policy. I believe this is similar 
to failover process, if the application guarantees EXACTLY_ONCE semantic [2], I 
think the restarted application should still follow the exactly once semantic.

In general, I think it should work. If you think there is any issues with this 
proposal, please suggest in this thread. Thanks!

Reference:
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheus
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing

Best,
Fuyao



From: David Morávek <d...@apache.org>
Date: Friday, October 29, 2021 at 23:11
To: Fuyao Li <fuyao...@oracle.com>
Cc: user <user@flink.apache.org>, Yang Wang <danrtsey...@gmail.com>, Robert 
Metzger <rmetz...@apache.org>, tonysong...@gmail.com <tonysong...@gmail.com>
Subject: [External] : Re: Possibility of supporting Reactive mode for native 
Kubernetes application mode
Hi Fuyao,

this is a great question ;)

1) First let's be clear on what the reactive mode actually is.

Reactive Mode is related to how the Flink makes use of the newly available 
resources. It greedily uses all of the resources that are available in your 
Flink cluster (if new task manager joins in, it re-scales).

You as a Flink operator are responsible for adding / removing the task 
managers. This can be done manually or by some kind of horizontal auto-scaler 
(from what I understand this is what you want to achieve).

2) K8s native integration leverages an active resource management.

The standalone deployment mode uses the passive resource management, which 
means that YOU are responsible for the resource allocation (manually starting 
up processes). K8s native integration on the other hand allocates resources for 
you based on the static configuration it needs.

Right now there is no mechanism that would be able to make a decision to add / 
remove task managers (eg. based on some metric). -> This is most likely the 
missing part.

3) Summing up the above...

In theory "reactive mode" could be implemented for the application mode with 
active resource management (k8s native / yarn), but this would either require 
Flink to provide a generic auto-scaler component that would be embedded within 
the JobMaster or letting user to provide his own. This would require some 
design discussion as the ideas we have about this are still on the "theoretical 
level".

Even though this may be a future development, there are currently no plans on 
doing that. Main focus in this area is now on making the reactive mode / 
adaptive scheduler production ready (user metrics, fixing the UI, faster TM 
loss / disconnect detection, local recovery) and speeding up the Flink recovery 
mechanism so the re-scaling experience is much smoother.

Best,
D.

On Wed, Oct 27, 2021 at 11:57 PM Fuyao Li 
<fuyao...@oracle.com<mailto:fuyao...@oracle.com>> wrote:
Hello Community,

I am checking the reactive mode for Flink deployment. I noticed that this is 
supported in Kubernetes environment, but only for standalone Kubernetes as of 
now. I have read some previous discussion threads regarding this issue. See 
[1][2][3][4][5][6].

Question 1:
It seems that due to some interface and design considerations [4] mentioned by 
Robert and Xintong and official doc[5], this feature is only for standalone k8s 
and it is not available for native Kubernetes now. However, I believe in 
theory, it is possible to be added to native Kubernetes, right? Will this be 
part of the future plan? If not, what is the restriction and is it a hard 
restriction?

Question 2:
I have built an native Kubernetes operator on top of Yang’s work [7] supporting 
various state transfers in native k8s application mode and session mode. Right 
now, I am seeking for adding some similar features like reactive scaling for 
native k8s. From my perspective, what I can do is to enable periodic savepoints 
and scale up/down based certain metrics we collect inside the Flink 
application. Some additional resource considerations need to be added to 
implement such feature, similar to the adaptive scheduler concept in [9][10] (I 
didn’t dive deep into that, I guess I just need to calculated the new TMs will 
be offered with sufficient k8s resources if the rescale happens?)
I think as a user/operator, I am not supposed by to be able to 
recover/restarted a job from checkpoint [8].
I guess this might cause some performance loss since savepoints are more 
expensive and the Flink application must do both savepoint and checkpoint 
periodically… Is there any possible ways that user can also use checkpoints to 
restart and recover as a user? If Question 1 will be part of the future plan, I 
guess I won’t need much work here.

Reference:
[1] Reactive mode blog: 
https://flink.apache.org/2021/05/06/reactive-mode.html<https://urldefense.com/v3/__https:/flink.apache.org/2021/05/06/reactive-mode.html__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVKg4EB28$>
[2] example usage of reactive scaling: 
https://github.com/rmetzger/flink-reactive-mode-k8s-demo<https://urldefense.com/v3/__https:/github.com/rmetzger/flink-reactive-mode-k8s-demo__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVGe0viD4$>
[3] FILP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-159*3A*Reactive*Mode__;JSsr!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVreFl2sk$>
[4] Discussion thread: 
https://lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374%40%3Cdev.flink.apache.org%3E<https://urldefense.com/v3/__https:/lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374*40*3Cdev.flink.apache.org*3E__;JSUl!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVxAq0hRQ$>
[5] Flink doc: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVUa59UtI$>
[6] Flink Jira: 
https://issues.apache.org/jira/browse/FLINK-10407\<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/FLINK-10407/__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVzBYJfLg$>
[7] 
https://github.com/wangyang0918/flink-native-k8s-operator<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVjPq5d6w$>
[8] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#difference-to-savepoints<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/*difference-to-savepoints__;Iw!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVl2IgA5g$>
[9] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-138*3A*Declarative*Resource*management__;JSsrKw!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVvZb5u0I$>
[10] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-160*3A*Adaptive*Scheduler__;JSsr!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVSP5Qkkk$>

Thanks,
Fuyao

Reply via email to