Hi Tamir,

Thanks for sharing the information.

I think you are right. If you dive into the code implementation
of flink-native-k8s-operator[1], they are just same.

@Till Rohrmann <trohrm...@apache.org> Do you think it is reasonable to make
"ApplicationDeployer" interface as public? Then I believe
it will be great easier to integrate with deployer systems.


[1]. https://github.com/wangyang0918/flink-native-k8s-operator

Best,
Yang


Tamir Sagi <tamir.s...@niceactimize.com> 于2021年3月12日周五 上午3:56写道:

> Hey Till,
>
> You are right.
>
> I'm new to Flink, I was looking for a Java way to deploy an application
> cluster. I first tried the standalone approach and changed to native (although
> the official documents specify that application mode is more suitable for
> production , they show only the CLI way).
>
> I encountered the k8s operator (Which provided me a good direction) but
> was unnecessary Since we don't want to deal with custom resources and
> manage K8S resources ourselves.
>
> Hence, as I showed, I wrote a small program that deploys an application
> cluster(Using Flink client & Flink Kubernetes client - which uses the
> fabric8io-kubernetes-client
> <https://github.com/fabric8io/kubernetes-client> under the hood)  for
> others who will encounter that post.
>
> Cheers mate,
>
> Tamir.
> ------------------------------
> *From:* Till Rohrmann <trohrm...@apache.org>
> *Sent:* Thursday, March 11, 2021 7:13 PM
> *To:* Tamir Sagi <tamir.s...@niceactimize.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Application cluster - Best Practice
>
>
> *EXTERNAL EMAIL*
>
>
> What the Flink client does for you when starting an application mode
> cluster in native K8s mode is to generate the K8s job specification and to
> submit it to the K8s cluster. Hence, you can also do it yourself if you
> prefer to manage the K8s resources directly. Writing your own client should
> also work but is of course more work.
>
> Cheers,
> Till
>
> On Thu, Mar 11, 2021 at 3:49 PM Tamir Sagi <tamir.s...@niceactimize.com>
> wrote:
>
> Hey Till,
>
> Thank you for responding.
>
> I've already read the link you send , but they are not enough , they don't
> provide a good solution for production.
>
> Standalone-Kubernetes is not a good approach for production for 3 main
> reasons(In my opinion):
>
>    - TMs are defined as deployment which means they stay up and running
>    even after the job manager(k8s job) is completed
>    - Using these configurations , Flink does not aware of the
>    standalone Kubernetes, which means you have to clean the resources 
> yourself.
>    - IMHO, dealing with yaml files in production is a bad practice
>    , Deploying application cluster via yaml files in runtime is not a good
>    approach. (how exactly? terraform, Kubernetes client, kubectl?)
>
>
> Native k8s *is the right* approach since it terminates all dynamic
> resources, however the documentation shows only a deployment via Flink CLI,
> which again not a good practice in production
>
> Another solution is to use Kubernetes Operation(i.e
> https://github.com/wangyang0918/flink-native-k8s-operator) ,however, the
> operator expects CRDs, which defined by yaml file + triggering new Flink
> app is done by yaml files.
>
>
>
> *Unfortunately, the documentation doesn't elaborate much about the Flink
> Client. *
> After digging and debugging the operation lib above,  I found out that
> Flink client can deploy an application cluster.
>
> Here is a simple Program that creates an *application cluster* * with
> dynamic resources allocation *(Using Flink Client & Flink Kubernetes
> client) in Java
>
> *Dependencies  (*i.e flink.version=1.12.1, scala.binary.version=2.12)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> * <dependency>   <groupId>org.apache.flink</groupId>
>  <artifactId>flink-java</artifactId>   <version>${flink.version}</version>
>  <scope>provided</scope></dependency><dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-clients_2.12</artifactId>
>  <version>${flink.version}</version></dependency><dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
>  <version>${flink.version}</version></dependency><dependency>
>  <groupId>org.apache.flink</groupId>   <artifactId>flink-core</artifactId>
>  <version>${flink.version}</version></dependency> Java Code:*
>
> @Log4j2
> public class TestFlinkClientToAppCluster {
>
>     public static void main(String[] args) throws InterruptedException {
>         log.info("Client started");
>         // Deploy application
>         final ClusterClientServiceLoader clusterClientServiceLoader = new 
> DefaultClusterClientServiceLoader();
>         final ApplicationDeployer deployer = new 
> ApplicationClusterDeployer(clusterClientServiceLoader);
>         final ApplicationConfiguration applicationConfiguration =
>                 new ApplicationConfiguration(new String[]{},null);
>         try {
>             deployer.run(getEffectiveConfig(), applicationConfiguration);
>             log.info("Finished");
>         } catch (Exception e) {
>             log.error("Failed to deploy cluster {}", "flink-test", e);
>         }
>     }
>
> Where the getEffectiveConfig() returns a Configuration Object. represents
> a flink-conf.yaml file with all necessary parameters
>
> For example:
>
>  Configuration effectiveConfig = new Configuration(); //you can load from 
> file: GlobalConfiguration.loadConfiguration(<path>);
> URI uri = new URI("local://path/to/artifcat");
>
> effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, "default");
>
> effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, "flink-test");
>
> effectiveConfig.set(DeploymentOptions.TARGET, "kubernetes-application");
> effectiveConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
> KubernetesConfigOptions.ServiceExposedType.ClusterIP);
> effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE,
> "app-cluster-job-manager:1.0.1");
> effectiveConfig.set(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH,
> "/docker-entrypoint.sh");
> effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList
> (uri.toString()));
>
> if specific configurations for TM or JM are needed, then you can configure
> them like that
>
> effectiveConfig.setString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), 
> "1024m");
> effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1);
> effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, 500.0);
> effectiveConfig.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), 
> "2048m");
> effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_CPU, 500.0);
>
>
> If the application runs outside K8S it might throw an error indicating the
> pod failed to find the REST endpoint (it looks for k8s format
> <cluster-name>-rest.<namespace>) - but the cluster will be deployed anyway.
>
> I deployed the application on k8s (docker-desktop)
>
>    - Make sure you provide the kube config . since Flink Kubernetes
>    client is running within k8s it needs credentials to communicate with API
>    server and create dynamic resources. - otherwise, the following exception
>    will be raised: Java:
>    sun.security.provider.certpath.SunCertPathBuilderException: unable to find
>    valid certification path to requested target
>
> Docker file for the program above (config is the local Kubernetes
> credentials file located in ~/.kube/config) I copied it to the project dir.
>
> FROM openjdk:11-jdk-slim-buster
> COPY target/<jar-name>.jar /
> RUN mkdir -p /opt/files
> ADD log4j2.xml /opt/files/log4j.xml
> ADD config  /opt/files/config
> ENV KUBECONFIG=/opt/files/config
> ENTRYPOINT ["java","-Dlog4j.configurationFile=/opt/files/log4j2.xml", "-jar", 
> "/<jar-name>.jar"]
>
>
> Hope that helps,
>
> Tamir.
>
>
>
>
>
> ------------------------------
> *From:* Till Rohrmann <trohrm...@apache.org>
> *Sent:* Thursday, March 11, 2021 3:40 PM
> *To:* Tamir Sagi <tamir.s...@niceactimize.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Application cluster - Best Practice
>
>
> *EXTERNAL EMAIL*
>
>
> Hi Tamir,
>
> 1. How you start a cluster in application mode depends on how you deploy
> Flink. If you want to use Flink's standalone application mode, then you
> have to deploy a new cluster for every job you want to run [1, 2, 3]. What
> a standalone application cluster needs is the user code jar and a pointer
> which class to run. So if you want to deploy a standalone cluster on Docker
> or Kubernetes, then you either add the user code to your image, mount it as
> a volume or specify an init container (K8s case) to download the user code
> jar from somewhere.
>
> If you want to deploy on K8s and are able to use Flink's native
> integration (meaning that Flink can talk with the K8s cluster), you can
> also use the client to submit a new application cluster [4]. But also here,
> you have to make the user code available to your container in some way.
>
> 2. If you deploy your application cluster in standalone mode, then Flink
> won't automatically terminate the TaskManagers. This only works when using
> one of Flink's active deployments (K8s or Yarn). Hence, you either have to
> wait for the registration timeout of the TaskManager or stop them
> explicitly.
>
> 3. No, TaskManagers don't have to stay alive. The fact that they stayed
> alive is caused by the fact that you deployed Flink in standalone mode
> where Flink cannot start and stop TaskManagers.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#application-mode-on-docker
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#deploy-application-cluster
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/#application-mode
> [4]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#application-mode
>
> Cheers,
> Till
>
> On Sat, Mar 6, 2021 at 6:16 PM Tamir Sagi <tamir.s...@niceactimize.com>
> wrote:
>
> Hey All,
>
> I'm running an application cluster(Flink 1.11.1) on top of
> Kubernetes(currently running locally using docker-desktop). deployed via
> terraform module. (2 task managers)
>
> I followed the following instruction(well written)
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#deploy-application-cluster
>
> I have successfully executed the batch job. Kubernetes job(marked as
> completed)
>
>
> I was reading the following article
> https://flink.apache.org/news/2020/07/14/application-mode.html
> Apache Flink: Application Deployment in Flink: Current State and the new
> Application Mode
> <https://flink.apache.org/news/2020/07/14/application-mode.html>
> Application Deployment in Flink: Current State and the new Application
> Mode. 14 Jul 2020 Kostas Kloudas ()With the rise of stream processing and
> real-time analytics as a critical tool for modern businesses, an increasing
> number of organizations build platforms with Apache Flink at their core and
> offer it internally as a service.
> flink.apache.org
>
>
> I have several questions
>
>    1. What is the best practice to trigger jobs on the fly? in other
>    words, how to submit new jobs in runtime in application mode? (in session
>    cluster I could submit job via Flink-client).
>
>    2. Once the job is completed, I get the following message inside Task
>    manager
>
> *[2021-03-06T17:06:55,054][Info] {} [o.a.f.r.t.TaskExecutor]: Could not
>    resolve ResourceManager address
>    
> akka.tcp://flink@data-aggregation-flink-jobmanager:6123/user/rpc/resourcemanager_*,
>    retrying in 10000 ms: Could not connect to rpc endpoint under address
>    
> akka.tcp://flink@data-aggregation-flink-jobmanager:6123/user/rpc/resourcemanager_*.
>    *
>    it tries to register to resource manager until it's crashed with the
>    error:
>
> *[2021-03-06T17:10:47,437][Error] {} [o.a.f.r.t.TaskManagerRunner]: Fatal
>    error occurred while executing the TaskManager. Shutting it
>    
> down...org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
>    Could not register at the ResourceManager within the specified maximum
>    registration duration 300000 ms. This indicates a problem with this
>    instance. Terminating now. *
>    Is that a normal behavior?
>
>
>    3. In application mode, Does Task manager have to stay alive after the
>    job has been completed?
>
>
> Thanks,
> Tamir.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>

Reply via email to