Hi David, Nicolaus,

Thanks for the reply.


  1.  For your first question, Yes. I want to use the checkpoint to stop and 
restart the application. I think this is similar to the Reactive mode strategy, 
right? (I don’t know the exact implementation behind the Reactive mode). From 
your description and Nicolaus reply, I guess this improvement for checkpoint 
will benefit both Reactive mode and this workflow I designed instead of 
breaking this proposal, right?

  *   For Nicolaus, after such change in 1.15, do you mean the checkpoint can’t 
be used to restart a job? If this is the case, maybe my proposal will not work 
after 1.15…
Please share the Jira link to this design if possible and correct my statement 
if I am wrong.

Nicolaus’s suggestion of leveraging retained checkpoint is exactly what I am 
trying to describe in my 3-step solution.

Quote from Nicolaus:
“
About your second question: You are right that taking and restoring from 
savepoints will incur a performance loss. They cannot be incremental, and 
cannot use native (low-level) data formats - for now. These issues are on the 
list of things to improve for Flink 1.15, so if the changes make it into the 
release, it may improve a lot.
You can restore a job from a retained checkpoint (provided you configured 
retained checkpoints, else they are deleted on job cancellation), see [1] 
(right below the part you linked). It should be possible to rescale using a 
retained checkpoint, despite the docs suggesting otherwise (it was uncertain 
whether this guarantee should/can be given, so it was not stated in the docs. 
This is also expected to change in the future as it is a necessity for further 
reactive mode development).

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint

”


  1.  For using Standalone Kubernetes problem. I have development a Flink 
native Kubernetes operator upon 
https://github.com/wangyang0918/flink-native-k8s-operator<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVjPq5d6w$>
 . Right now, this operator can basically achieve everything Flink CLI could do 
for session mode and application mode and more. This includes features like 
rescaling with savepoint (stop with savepoint and start from savepoint), stop 
with savepoint, submit/stop/cancel session jobs etc. All of these are automated 
through a unified Kubernetes CRD. For sake of time, I don’t want to write 
another operator for standalone k8s operator… As a result, I am seeking to add 
the reactive scaling function into this operator. Nevertheless, I really 
appreciate the work for reactive mode in standalone Kubernetes.

Based on Nicolaus’s reply. I think if we configure the retain checkpoint 
policy. By default, I think only one checkpoint will be retained (please 
correct me if I am wrong) and we can capture the directory and rescale the 
application.
See 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-externalized-checkpoint-retention
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint



  1.  I removed the [EXTERNAL] tag in the email. This is something 
automatically added by the company’s email box. Sorry for the confusion.


Best Regards,
Fuyao

From: David Morávek <d...@apache.org>
Date: Tuesday, November 2, 2021 at 05:53
To: Fuyao Li <fuyao...@oracle.com>
Cc: user <user@flink.apache.org>, Yang Wang <danrtsey...@gmail.com>, Robert 
Metzger <rmetz...@apache.org>, tonysong...@gmail.com <tonysong...@gmail.com>, 
Sandeep Sooryaprakash <sandeep.sooryaprak...@oracle.com>
Subject: Re: [External] : Re: Possibility of supporting Reactive mode for 
native Kubernetes application mode
Similar to Reactive mode, checkpoint must be enabled to support such 
functionality. ...

Wouldn't that mean tearing down the whole Flink cluster in order to re-scale? 
That could be quite costly. We're aiming to speed-up the recovery process for 
the reactive mode and this would most likely block you from leveraging these 
efforts.

Maybe let's take a step back, is there anything keeping you from using 
standalone deployment here? If you already have a k8s operator, you could 
simply spawn a new JM / TM deployment without delegating this responsibility to 
Flink. Then the autoscaling could be as simple as creating a custom k8s metrics 
server [1] (or simply use the prometheus based one) and setting up the HPA for 
task managers.

Also I think it will be tricky to simply stop a job and retrieve the latest 
retained checkpoint in your operator. We'll try to make this easier to achieve 
in 1.15 release, but AFAIK there is currently no easy way to do so.

PS: Can you please do something about the "[EXTERNAL]" labels in the email 
subject? It breaks email threading and it makes it harder to keep track of the 
on-going conversations.

[1] 
https://github.com/kubernetes-sigs/custom-metrics-apiserver<https://urldefense.com/v3/__https:/github.com/kubernetes-sigs/custom-metrics-apiserver__;!!ACWV5N9M2RV99hQ!Yy9-zoIP-rBsUzLJlmIApTXo1XP38pWwXZesAh8fi_Dcq36cg4sJ4s2RVz650S0$>

Best,
D.


On Tue, Nov 2, 2021 at 12:55 AM Fuyao Li 
<fuyao...@oracle.com<mailto:fuyao...@oracle.com>> wrote:
Hello David,

Thanks for the detailed explanation. This is really helpful!

I also had an offline discussion with Yang Wang. He also told me this could be 
done in the future, but not part of the recent plan.

As suggested, I think I can do the follow things to achieve some auto-scaling 
component in Flink operator.


  1.  Scrape metrics from Kubernetes (POD CPU usage/Memory usage/…) and Flink 
metrics system (For example, leveraging Prometheus metric exporter[1] in Flink)
  2.  With metrics in step(1), implement logic in operator to support 
customized scaling policy based on user definition and requirements. All these 
things should be configurable by FlinkApplication CRD.
  3.  Similar to Reactive mode, checkpoint must be enabled to support such 
functionality. When a certain criteria is meet, Flink operator will 
automatically triggered a stop command without manual intervention (checkpoint 
is retained) and bring up the Flink application from the last checkpoint with 
the new configurations it calculates based on policy. I believe this is similar 
to failover process, if the application guarantees EXACTLY_ONCE semantic [2], I 
think the restarted application should still follow the exactly once semantic.

In general, I think it should work. If you think there is any issues with this 
proposal, please suggest in this thread. Thanks!

Reference:
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheus<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/*prometheus__;Iw!!ACWV5N9M2RV99hQ!Yy9-zoIP-rBsUzLJlmIApTXo1XP38pWwXZesAh8fi_Dcq36cg4sJ4s2RgVAXNio$>
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/*enabling-and-configuring-checkpointing__;Iw!!ACWV5N9M2RV99hQ!Yy9-zoIP-rBsUzLJlmIApTXo1XP38pWwXZesAh8fi_Dcq36cg4sJ4s2RBXL0ykM$>

Best,
Fuyao



From: David Morávek <d...@apache.org<mailto:d...@apache.org>>
Date: Friday, October 29, 2021 at 23:11
To: Fuyao Li <fuyao...@oracle.com<mailto:fuyao...@oracle.com>>
Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>>, Yang Wang 
<danrtsey...@gmail.com<mailto:danrtsey...@gmail.com>>, Robert Metzger 
<rmetz...@apache.org<mailto:rmetz...@apache.org>>, 
tonysong...@gmail.com<mailto:tonysong...@gmail.com> 
<tonysong...@gmail.com<mailto:tonysong...@gmail.com>>
Subject: [External] : Re: Possibility of supporting Reactive mode for native 
Kubernetes application mode
Hi Fuyao,

this is a great question ;)

1) First let's be clear on what the reactive mode actually is.

Reactive Mode is related to how the Flink makes use of the newly available 
resources. It greedily uses all of the resources that are available in your 
Flink cluster (if new task manager joins in, it re-scales).

You as a Flink operator are responsible for adding / removing the task 
managers. This can be done manually or by some kind of horizontal auto-scaler 
(from what I understand this is what you want to achieve).

2) K8s native integration leverages an active resource management.

The standalone deployment mode uses the passive resource management, which 
means that YOU are responsible for the resource allocation (manually starting 
up processes). K8s native integration on the other hand allocates resources for 
you based on the static configuration it needs.

Right now there is no mechanism that would be able to make a decision to add / 
remove task managers (eg. based on some metric). -> This is most likely the 
missing part.

3) Summing up the above...

In theory "reactive mode" could be implemented for the application mode with 
active resource management (k8s native / yarn), but this would either require 
Flink to provide a generic auto-scaler component that would be embedded within 
the JobMaster or letting user to provide his own. This would require some 
design discussion as the ideas we have about this are still on the "theoretical 
level".

Even though this may be a future development, there are currently no plans on 
doing that. Main focus in this area is now on making the reactive mode / 
adaptive scheduler production ready (user metrics, fixing the UI, faster TM 
loss / disconnect detection, local recovery) and speeding up the Flink recovery 
mechanism so the re-scaling experience is much smoother.

Best,
D.

On Wed, Oct 27, 2021 at 11:57 PM Fuyao Li 
<fuyao...@oracle.com<mailto:fuyao...@oracle.com>> wrote:
Hello Community,

I am checking the reactive mode for Flink deployment. I noticed that this is 
supported in Kubernetes environment, but only for standalone Kubernetes as of 
now. I have read some previous discussion threads regarding this issue. See 
[1][2][3][4][5][6].

Question 1:
It seems that due to some interface and design considerations [4] mentioned by 
Robert and Xintong and official doc[5], this feature is only for standalone k8s 
and it is not available for native Kubernetes now. However, I believe in 
theory, it is possible to be added to native Kubernetes, right? Will this be 
part of the future plan? If not, what is the restriction and is it a hard 
restriction?

Question 2:
I have built an native Kubernetes operator on top of Yang’s work [7] supporting 
various state transfers in native k8s application mode and session mode. Right 
now, I am seeking for adding some similar features like reactive scaling for 
native k8s. From my perspective, what I can do is to enable periodic savepoints 
and scale up/down based certain metrics we collect inside the Flink 
application. Some additional resource considerations need to be added to 
implement such feature, similar to the adaptive scheduler concept in [9][10] (I 
didn’t dive deep into that, I guess I just need to calculated the new TMs will 
be offered with sufficient k8s resources if the rescale happens?)
I think as a user/operator, I am not supposed by to be able to 
recover/restarted a job from checkpoint [8].
I guess this might cause some performance loss since savepoints are more 
expensive and the Flink application must do both savepoint and checkpoint 
periodically… Is there any possible ways that user can also use checkpoints to 
restart and recover as a user? If Question 1 will be part of the future plan, I 
guess I won’t need much work here.

Reference:
[1] Reactive mode blog: 
https://flink.apache.org/2021/05/06/reactive-mode.html<https://urldefense.com/v3/__https:/flink.apache.org/2021/05/06/reactive-mode.html__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVKg4EB28$>
[2] example usage of reactive scaling: 
https://github.com/rmetzger/flink-reactive-mode-k8s-demo<https://urldefense.com/v3/__https:/github.com/rmetzger/flink-reactive-mode-k8s-demo__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVGe0viD4$>
[3] FILP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-159*3A*Reactive*Mode__;JSsr!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVreFl2sk$>
[4] Discussion thread: 
https://lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374%40%3Cdev.flink.apache.org%3E<https://urldefense.com/v3/__https:/lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374*40*3Cdev.flink.apache.org*3E__;JSUl!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVxAq0hRQ$>
[5] Flink doc: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVUa59UtI$>
[6] Flink Jira: 
https://issues.apache.org/jira/browse/FLINK-10407\<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/FLINK-10407/__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVzBYJfLg$>
[7] 
https://github.com/wangyang0918/flink-native-k8s-operator<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVjPq5d6w$>
[8] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#difference-to-savepoints<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/*difference-to-savepoints__;Iw!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVl2IgA5g$>
[9] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-138*3A*Declarative*Resource*management__;JSsrKw!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVvZb5u0I$>
[10] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler<https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-160*3A*Adaptive*Scheduler__;JSsr!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVSP5Qkkk$>

Thanks,
Fuyao

Reply via email to