>
> Similar to Reactive mode, checkpoint must be enabled to support such
> functionality. ...
>

Wouldn't that mean tearing down the whole Flink cluster in order to
re-scale? That could be quite costly. We're aiming to speed-up the recovery
process for the reactive mode and this would most likely block you from
leveraging these efforts.

Maybe let's take a step back, is there anything keeping you from using
standalone deployment here? If you already have a k8s operator, you could
simply spawn a new JM / TM deployment without delegating this
responsibility to Flink. Then the autoscaling could be as simple as
creating a custom k8s metrics server [1] (or simply use the prometheus
based one) and setting up the HPA for task managers.

Also I think it will be tricky to simply stop a job and retrieve the latest
retained checkpoint in your operator. We'll try to make this easier to
achieve in 1.15 release, but AFAIK there is currently no easy way to do so.

PS: Can you please do something about the "[EXTERNAL]" labels in the email
subject? It breaks email threading and it makes it harder to keep track of
the on-going conversations.

[1] https://github.com/kubernetes-sigs/custom-metrics-apiserver

Best,
D.


On Tue, Nov 2, 2021 at 12:55 AM Fuyao Li <fuyao...@oracle.com> wrote:

> Hello David,
>
>
>
> Thanks for the detailed explanation. This is really helpful!
>
>
>
> I also had an offline discussion with Yang Wang. He also told me this
> could be done in the future, but not part of the recent plan.
>
>
>
> As suggested, I think I can do the follow things to achieve some
> auto-scaling component in Flink operator.
>
>
>
>    1. Scrape metrics from Kubernetes (POD CPU usage/Memory usage/…) and
>    Flink metrics system (For example, leveraging Prometheus metric exporter[1]
>    in Flink)
>    2. With metrics in step(1), implement logic in operator to support
>    customized scaling policy based on user definition and requirements. All
>    these things should be configurable by FlinkApplication CRD.
>    3. Similar to Reactive mode, checkpoint must be enabled to support
>    such functionality. When a certain criteria is meet, Flink operator will
>    automatically triggered a stop command without manual intervention
>    (checkpoint is retained) and bring up the Flink application from the last
>    checkpoint with the new configurations it calculates based on policy. I
>    believe this is similar to failover process, if the application guarantees
>    EXACTLY_ONCE semantic [2], I think the restarted application should still
>    follow the exactly once semantic.
>
>
>
> In general, I think it should work. If you think there is any issues with
> this proposal, please suggest in this thread. Thanks!
>
>
>
> Reference:
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheus
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing
>
>
>
> Best,
>
> Fuyao
>
>
>
>
>
>
>
> *From: *David Morávek <d...@apache.org>
> *Date: *Friday, October 29, 2021 at 23:11
> *To: *Fuyao Li <fuyao...@oracle.com>
> *Cc: *user <user@flink.apache.org>, Yang Wang <danrtsey...@gmail.com>,
> Robert Metzger <rmetz...@apache.org>, tonysong...@gmail.com <
> tonysong...@gmail.com>
> *Subject: *[External] : Re: Possibility of supporting Reactive mode for
> native Kubernetes application mode
>
> Hi Fuyao,
>
>
>
> this is a great question ;)
>
>
>
> 1) First let's be clear on what the reactive mode actually is.
>
>
>
> Reactive Mode is related to how the Flink makes use of the newly available
> resources. It greedily uses all of the resources that are available in your
> Flink cluster (if new task manager joins in, it re-scales).
>
>
>
> You as a Flink operator are responsible for adding / removing the task
> managers. This can be done manually or by some kind of horizontal
> auto-scaler (from what I understand this is what you want to achieve).
>
>
>
> 2) K8s native integration leverages an active resource management.
>
>
>
> The standalone deployment mode uses the passive resource management, which
> means that YOU are responsible for the resource allocation (manually
> starting up processes). K8s native integration on the other hand allocates
> resources for you based on the static configuration it needs.
>
>
>
> Right now there is no mechanism that would be able to make a decision to
> add / remove task managers (eg. based on some metric). -> This is most
> likely the missing part.
>
>
>
> 3) Summing up the above...
>
>
>
> In theory "reactive mode" could be implemented for the application mode
> with active resource management (k8s native / yarn), but this would either
> require Flink to provide a generic *auto-scaler component* that would be
> embedded within the JobMaster or letting user to provide his own. This
> would require some design discussion as the ideas we have about this are
> still on the "theoretical level".
>
>
>
> Even though this may be a future development, there are currently no plans
> on doing that. Main focus in this area is now on making the reactive mode /
> adaptive scheduler production ready (user metrics, fixing the UI, faster TM
> loss / disconnect detection, local recovery) and speeding up the Flink
> recovery mechanism so the re-scaling experience is much smoother.
>
>
>
> Best,
>
> D.
>
>
>
> On Wed, Oct 27, 2021 at 11:57 PM Fuyao Li <fuyao...@oracle.com> wrote:
>
> Hello Community,
>
>
>
> I am checking the reactive mode for Flink deployment. I noticed that this
> is supported in Kubernetes environment, but only for standalone Kubernetes
> as of now. I have read some previous discussion threads regarding this
> issue. See [1][2][3][4][5][6].
>
>
>
> Question 1:
>
> It seems that due to some interface and design considerations [4]
> mentioned by Robert and Xintong and official doc[5], this feature is only
> for standalone k8s and it is not available for native Kubernetes now.
> However, I believe in theory, it is possible to be added to native
> Kubernetes, right? Will this be part of the future plan? If not, what is
> the restriction and is it a hard restriction?
>
>
>
> Question 2:
>
> I have built an native Kubernetes operator on top of Yang’s work [7]
> supporting various state transfers in native k8s application mode and
> session mode. Right now, I am seeking for adding some similar features like
> reactive scaling for native k8s. From my perspective, what I can do is to
> enable periodic savepoints and scale up/down based certain metrics we
> collect inside the Flink application. Some additional resource
> considerations need to be added to implement such feature, similar to the
> adaptive scheduler concept in [9][10] (I didn’t dive deep into that, I
> guess I just need to calculated the new TMs will be offered with sufficient
> k8s resources if the rescale happens?)
>
> I think as a user/operator, I am not supposed by to be able to
> recover/restarted a job from checkpoint [8].
>
> I guess this might cause some performance loss since savepoints are more
> expensive and the Flink application must do both savepoint and checkpoint
> periodically… Is there any possible ways that user can also use checkpoints
> to restart and recover as a user? If Question 1 will be part of the future
> plan, I guess I won’t need much work here.
>
>
>
> Reference:
>
> [1] Reactive mode blog:
> https://flink.apache.org/2021/05/06/reactive-mode.html
> <https://urldefense.com/v3/__https:/flink.apache.org/2021/05/06/reactive-mode.html__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVKg4EB28$>
>
> [2] example usage of reactive scaling:
> https://github.com/rmetzger/flink-reactive-mode-k8s-demo
> <https://urldefense.com/v3/__https:/github.com/rmetzger/flink-reactive-mode-k8s-demo__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVGe0viD4$>
>
> [3] FILP:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> <https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-159*3A*Reactive*Mode__;JSsr!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVreFl2sk$>
>
> [4] Discussion thread:
> https://lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374%40%3Cdev.flink.apache.org%3E
> <https://urldefense.com/v3/__https:/lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374*40*3Cdev.flink.apache.org*3E__;JSUl!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVxAq0hRQ$>
>
> [5] Flink doc:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/
> <https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVUa59UtI$>
>
> [6] Flink Jira: https://issues.apache.org/jira/browse/FLINK-10407\
> <https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/FLINK-10407/__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVzBYJfLg$>
>
> [7] https://github.com/wangyang0918/flink-native-k8s-operator
> <https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator__;!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVjPq5d6w$>
>
> [8]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#difference-to-savepoints
> <https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/*difference-to-savepoints__;Iw!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVl2IgA5g$>
>
> [9]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
> <https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-138*3A*Declarative*Resource*management__;JSsrKw!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVvZb5u0I$>
>
> [10]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler
> <https://urldefense.com/v3/__https:/cwiki.apache.org/confluence/display/FLINK/FLIP-160*3A*Adaptive*Scheduler__;JSsr!!ACWV5N9M2RV99hQ!e9dKEfRelGum2Jq0GkwBi7aZDNaI-wdiPhAPErwh6CfAI_m4HCCFVTCVSP5Qkkk$>
>
>
>
> Thanks,
>
> Fuyao
>
>

Reply via email to