Hi Fuyao, I just wanted to say that the performance loss that you rightly suspected when using savepoints (as opposed to checkpoints) may disappear with Flink 1.15. There should be no loss of functionality as far as checkpoints are concerned. I don't think the savepoint performance improvement goals are in the public Flink Jira yet.
You are right about the default value of 1 checkpoint that is retained, but you can change it if required: https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#state-checkpoints-num-retained Now that I think about it, you probably want to keep it at 1 intentionally (I think that's what you implied) - so you can be sure that the single checkpoint in the provided checkpoint directory is the latest retained checkpoint. I haven't attempted something like this before, but it sounds like it should work. Best, Nico On Tue, Nov 2, 2021 at 10:14 PM Fuyao Li <fuyao...@oracle.com> wrote: > Hi David, Nicolaus, > > > > Thanks for the reply. > > > > 1. For your first question, Yes. I want to use the checkpoint to stop > and restart the application. I think this is similar to the Reactive mode > strategy, right? (I don’t know the exact implementation behind the Reactive > mode). From your description and Nicolaus reply, I guess this improvement > for checkpoint will benefit both Reactive mode and this workflow I designed > instead of breaking this proposal, right? > > > - *For Nicolaus, after such change in 1.15, do you mean the checkpoint > can’t be used to restart a job? If this is the case, maybe my proposal will > not work after 1.15…* > > Please share the Jira link to this design if possible and correct my > statement if I am wrong. > > > > Nicolaus’s suggestion of leveraging retained checkpoint is exactly what I > am trying to describe in my 3-step solution. > > > > Quote from Nicolaus: > > “ > > About your second question: You are right that taking and restoring from > savepoints will incur a performance loss. They cannot be incremental, and > cannot use native (low-level) data formats - for now. These issues are on > the list of things to improve for Flink 1.15, so if the changes make it > into the release, it may improve a lot. > > You can restore a job from a retained checkpoint (provided you configured > retained checkpoints, else they are deleted on job cancellation), see [1] > (right below the part you linked). It should be possible to rescale using a > retained checkpoint, despite the docs suggesting otherwise (it was > uncertain whether this guarantee should/can be given, so it was not stated > in the docs. This is also expected to change in the future as it is a > necessity for further reactive mode development). > > > > [1] > *https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint > <https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint>* > > > > ” > > > > 1. For using Standalone Kubernetes problem. I have development a Flink > *native > Kubernetes* operator upon > https://github.com/wangyang0918/flink-native-k8s-operator > > <https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVjPq5d6w$> > . Right now, this operator can basically achieve everything Flink CLI could > do for session mode and application mode and more. This includes features > like rescaling with savepoint (stop with savepoint and start from > savepoint), stop with savepoint, submit/stop/cancel session jobs etc. All > of these are automated through a unified Kubernetes CRD. For sake of time, > I don’t want to write another operator for standalone k8s operator… As a > result, I am seeking to add the reactive scaling function into this > operator. Nevertheless, I really appreciate the work for reactive mode in > standalone Kubernetes. > > > > Based on Nicolaus’s reply. I think if we configure the retain checkpoint > policy. By default, I think only one checkpoint will be retained (please > correct me if I am wrong) and we can capture the directory and rescale the > application. > > See > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints > > > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-externalized-checkpoint-retention > > > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint > > > > > > 1. I removed the [EXTERNAL] tag in the email. This is something > automatically added by the company’s email box. Sorry for the confusion. > > > > > > Best Regards, > > Fuyao > > > > *From: *David Morávek <d...@apache.org> > *Date: *Tuesday, November 2, 2021 at 05:53 > *To: *Fuyao Li <fuyao...@oracle.com> > *Cc: *user <user@flink.apache.org>, Yang Wang <danrtsey...@gmail.com>, > Robert Metzger <rmetz...@apache.org>, tonysong...@gmail.com < > tonysong...@gmail.com>, Sandeep Sooryaprakash < > sandeep.sooryaprak...@oracle.com> > *Subject: *Re: [External] : Re: Possibility of supporting Reactive mode > for native Kubernetes application mode > > Similar to Reactive mode, checkpoint must be enabled to support such > functionality. ... > > > > Wouldn't that mean tearing down the whole Flink cluster in order to > re-scale? That could be quite costly. We're aiming to speed-up the recovery > process for the reactive mode and this would most likely block you from > leveraging these efforts. > > > > Maybe let's take a step back, is there anything keeping you from using > standalone deployment here? If you already have a k8s operator, you could > simply spawn a new JM / TM deployment without delegating this > responsibility to Flink. Then the autoscaling could be as simple as > creating a custom k8s metrics server [1] (or simply use the prometheus > based one) and setting up the HPA for task managers. > > > > Also I think it will be tricky to simply stop a job and retrieve the > latest retained checkpoint in your operator. We'll try to make this easier > to achieve in 1.15 release, but AFAIK there is currently no easy way to do > so. > > > > PS: Can you please do something about the "[EXTERNAL]" labels in the email > subject? It breaks email threading and it makes it harder to keep track of > the on-going conversations. > > > > [1] https://github.com/kubernetes-sigs/custom-metrics-apiserver > <https://urldefense.com/v3/__https:/github.com/kubernetes-sigs/custom-metrics-apiserver__;!!ACWV5N9M2RV99hQ!Yy9-zoIP-rBsUzLJlmIApTXo1XP38pWwXZesAh8fi_Dcq36cg4sJ4s2RVz650S0$> > > > > Best, > > D. > > > > > > On Tue, Nov 2, 2021 at 12:55 AM Fuyao Li <fuyao...@oracle.com> wrote: > > Hello David, > > > > Thanks for the detailed explanation. This is really helpful! > > > > I also had an offline discussion with Yang Wang. He also told me this > could be done in the future, but not part of the recent plan. > > > > As suggested, I think I can do the follow things to achieve some > auto-scaling component in Flink operator. > > > > 1. Scrape metrics from Kubernetes (POD CPU usage/Memory usage/…) and > Flink metrics system (For example, leveraging Prometheus metric exporter[1] > in Flink) > 2. With metrics in step(1), implement logic in operator to support > customized scaling policy based on user definition and requirements. All > these things should be configurable by FlinkApplication CRD. > 3. Similar to Reactive mode, checkpoint must be enabled to support > such functionality. When a certain criteria is meet, Flink operator will > automatically triggered a stop command without manual intervention > (checkpoint is retained) and bring up the Flink application from the last > checkpoint with the new configurations it calculates based on policy. I > believe this is similar to failover process, if the application guarantees > EXACTLY_ONCE semantic [2], I think the restarted application should still > follow the exactly once semantic. > > > > In general, I think it should work. If you think there is any issues with > this proposal, please suggest in this thread. Thanks! > > > > Reference: > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheus > <https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/*prometheus__;Iw!!ACWV5N9M2RV99hQ!Yy9-zoIP-rBsUzLJlmIApTXo1XP38pWwXZesAh8fi_Dcq36cg4sJ4s2RgVAXNio$> > > [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing > <https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/*enabling-and-configuring-checkpointing__;Iw!!ACWV5N9M2RV99hQ!Yy9-zoIP-rBsUzLJlmIApTXo1XP38pWwXZesAh8fi_Dcq36cg4sJ4s2RBXL0ykM$> > > > > Best, > > Fuyao > > > > > > > > *From: *David Morávek <d...@apache.org> > *Date: *Friday, October 29, 2021 at 23:11 > *To: *Fuyao Li <fuyao...@oracle.com> > *Cc: *user <user@flink.apache.org>, Yang Wang <danrtsey...@gmail.com>, > Robert Metzger <rmetz...@apache.org>, tonysong...@gmail.com < > tonysong...@gmail.com> > *Subject: *[External] : Re: Possibility of supporting Reactive mode for > native Kubernetes application mode > > Hi Fuyao, > > > > this is a great question ;) > > > > 1) First let's be clear on what the reactive mode actually is. > > > > Reactive Mode is related to how the Flink makes use of the newly available > resources. It greedily uses all of the resources that are available in your > Flink cluster (if new task manager joins in, it re-scales). > > > > You as a Flink operator are responsible for adding / removing the task > managers. This can be done manually or by some kind of horizontal > auto-scaler (from what I understand this is what you want to achieve). > > > > 2) K8s native integration leverages an active resource management. > > > > The standalone deployment mode uses the passive resource management, which > means that YOU are responsible for the resource allocation (manually > starting up processes). K8s native integration on the other hand allocates > resources for you based on the static configuration it needs. > > > > Right now there is no mechanism that would be able to make a decision to > add / remove task managers (eg. based on some metric). -> This is most > likely the missing part. > > > > 3) Summing up the above... > > > > In theory "reactive mode" could be implemented for the application mode > with active resource management (k8s native / yarn), but this would either > require Flink to provide a generic *auto-scaler component* that would be > embedded within the JobMaster or letting user to provide his own. This > would require some design discussion as the ideas we have about this are > still on the "theoretical level". > > > > Even though this may be a future development, there are currently no plans > on doing that. Main focus in this area is now on making the reactive mode / > adaptive scheduler production ready (user metrics, fixing the UI, faster TM > loss / disconnect detection, local recovery) and speeding up the Flink > recovery mechanism so the re-scaling experience is much smoother. > > > > Best, > > D. > > > > On Wed, Oct 27, 2021 at 11:57 PM Fuyao Li <fuyao...@oracle.com> wrote: > > Hello Community, > > > > I am checking the reactive mode for Flink deployment. I noticed that this > is supported in Kubernetes environment, but only for standalone Kubernetes > as of now. I have read some previous discussion threads regarding this > issue. See [1][2][3][4][5][6]. > > > > Question 1: > > It seems that due to some interface and design considerations [4] > mentioned by Robert and Xintong and official doc[5], this feature is only > for standalone k8s and it is not available for native Kubernetes now. > However, I believe in theory, it is possible to be added to native > Kubernetes, right? Will this be part of the future plan? If not, what is > the restriction and is it a hard restriction? > > > > Question 2: > > I have built an native Kubernetes operator on top of Yang’s work [7] > supporting various state transfers in native k8s application mode and > session mode. Right now, I am seeking for adding some similar features like > reactive scaling for native k8s. From my perspective, what I can do is to > enable periodic savepoints and scale up/down based certain metrics we > collect inside the Flink application. Some additional resource > considerations need to be added to implement such feature, similar to the > adaptive scheduler concept in [9][10] (I didn’t dive deep into that, I > guess I just need to calculated the new TMs will be offered with sufficient > k8s resources if the rescale happens?) > > I think as a user/operator, I am not supposed by to be able to > recover/restarted a job from checkpoint [8]. > > I guess this might cause some performance loss since savepoints are more > expensive and the Flink application must do both savepoint and checkpoint > periodically… Is there any possible ways that user can also use checkpoints > to restart and recover as a user? If Question 1 will be part of the future > plan, I guess I won’t need much work here. > > > > Reference: > > [1] Reactive mode blog: > https://flink.apache.org/2021/05/06/reactive-mode.html > <https://urldefense.com/v3/__https:/flink.apache.org/2021/05/06/reactive-mode.html__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVKg4EB28$> > > [2] example usage of reactive scaling: > https://github.com/rmetzger/flink-reactive-mode-k8s-demo > <https://urldefense.com/v3/__https:/github.com/rmetzger/flink-reactive-mode-k8s-demo__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVGe0viD4$> > > [3] FILP: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode > <https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-159*3A*Reactive*Mode__;JSsr!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVreFl2sk$> > > [4] Discussion thread: > https://lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374%40%3Cdev.flink.apache.org%3E > <https://urldefense.com/v3/__https:/lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374*40*3Cdev.flink.apache.org*3E__;JSUl!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVxAq0hRQ$> > > [5] Flink doc: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/ > <https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVUa59UtI$> > > [6] Flink Jira: https://issues.apache.org/jira/browse/FLINK-10407\ > <https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/FLINK-10407/__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVzBYJfLg$> > > [7] https://github.com/wangyang0918/flink-native-k8s-operator > <https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVjPq5d6w$> > > [8] > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#difference-to-savepoints > <https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/*difference-to-savepoints__;Iw!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVl2IgA5g$> > > [9] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management > <https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-138*3A*Declarative*Resource*management__;JSsrKw!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVvZb5u0I$> > > [10] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler > <https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-160*3A*Adaptive*Scheduler__;JSsr!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVSP5Qkkk$> > > > > Thanks, > > Fuyao > >