What the Flink client does for you when starting an application mode
cluster in native K8s mode is to generate the K8s job specification and to
submit it to the K8s cluster. Hence, you can also do it yourself if you
prefer to manage the K8s resources directly. Writing your own client should
also work but is of course more work.

Cheers,
Till

On Thu, Mar 11, 2021 at 3:49 PM Tamir Sagi <tamir.s...@niceactimize.com>
wrote:

> Hey Till,
>
> Thank you for responding.
>
> I've already read the link you send , but they are not enough , they don't
> provide a good solution for production.
>
> Standalone-Kubernetes is not a good approach for production for 3 main
> reasons(In my opinion):
>
>    - TMs are defined as deployment which means they stay up and running
>    even after the job manager(k8s job) is completed
>    - Using these configurations , Flink does not aware of the
>    standalone Kubernetes, which means you have to clean the resources 
> yourself.
>    - IMHO, dealing with yaml files in production is a bad practice
>    , Deploying application cluster via yaml files in runtime is not a good
>    approach. (how exactly? terraform, Kubernetes client, kubectl?)
>
>
> Native k8s *is the right* approach since it terminates all dynamic
> resources, however the documentation shows only a deployment via Flink CLI,
> which again not a good practice in production
>
> Another solution is to use Kubernetes Operation(i.e
> https://github.com/wangyang0918/flink-native-k8s-operator) ,however, the
> operator expects CRDs, which defined by yaml file + triggering new Flink
> app is done by yaml files.
>
>
>
> *Unfortunately, the documentation doesn't elaborate much about the Flink
> Client. *
> After digging and debugging the operation lib above,  I found out that
> Flink client can deploy an application cluster.
>
> Here is a simple Program that creates an *application cluster* * with
> dynamic resources allocation *(Using Flink Client & Flink Kubernetes
> client) in Java
>
> *Dependencies  (*i.e flink.version=1.12.1, scala.binary.version=2.12)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> * <dependency>   <groupId>org.apache.flink</groupId>
>  <artifactId>flink-java</artifactId>   <version>${flink.version}</version>
>  <scope>provided</scope></dependency><dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-clients_2.12</artifactId>
>  <version>${flink.version}</version></dependency><dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
>  <version>${flink.version}</version></dependency><dependency>
>  <groupId>org.apache.flink</groupId>   <artifactId>flink-core</artifactId>
>  <version>${flink.version}</version></dependency> Java Code:*
>
> @Log4j2
> public class TestFlinkClientToAppCluster {
>
>     public static void main(String[] args) throws InterruptedException {
>         log.info("Client started");
>         // Deploy application
>         final ClusterClientServiceLoader clusterClientServiceLoader = new 
> DefaultClusterClientServiceLoader();
>         final ApplicationDeployer deployer = new 
> ApplicationClusterDeployer(clusterClientServiceLoader);
>         final ApplicationConfiguration applicationConfiguration =
>                 new ApplicationConfiguration(new String[]{},null);
>         try {
>             deployer.run(getEffectiveConfig(), applicationConfiguration);
>             log.info("Finished");
>         } catch (Exception e) {
>             log.error("Failed to deploy cluster {}", "flink-test", e);
>         }
>     }
>
> Where the getEffectiveConfig() returns a Configuration Object. represents
> a flink-conf.yaml file with all necessary parameters
>
> For example:
>
>  Configuration effectiveConfig = new Configuration(); //you can load from 
> file: GlobalConfiguration.loadConfiguration(<path>);
> URI uri = new URI("local://path/to/artifcat");
>
> effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, "default");
>
> effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, "flink-test");
>
> effectiveConfig.set(DeploymentOptions.TARGET, "kubernetes-application");
> effectiveConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
> KubernetesConfigOptions.ServiceExposedType.ClusterIP);
> effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE,
> "app-cluster-job-manager:1.0.1");
> effectiveConfig.set(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH,
> "/docker-entrypoint.sh");
> effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList
> (uri.toString()));
>
> if specific configurations for TM or JM are needed, then you can configure
> them like that
>
> effectiveConfig.setString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), 
> "1024m");
> effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1);
> effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, 500.0);
> effectiveConfig.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), 
> "2048m");
> effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_CPU, 500.0);
>
>
> If the application runs outside K8S it might throw an error indicating the
> pod failed to find the REST endpoint (it looks for k8s format
> <cluster-name>-rest.<namespace>) - but the cluster will be deployed anyway.
>
> I deployed the application on k8s (docker-desktop)
>
>    - Make sure you provide the kube config . since Flink Kubernetes
>    client is running within k8s it needs credentials to communicate with API
>    server and create dynamic resources. - otherwise, the following exception
>    will be raised: Java:
>    sun.security.provider.certpath.SunCertPathBuilderException: unable to find
>    valid certification path to requested target
>
> Docker file for the program above (config is the local Kubernetes
> credentials file located in ~/.kube/config) I copied it to the project dir.
>
> FROM openjdk:11-jdk-slim-buster
> COPY target/<jar-name>.jar /
> RUN mkdir -p /opt/files
> ADD log4j2.xml /opt/files/log4j.xml
> ADD config  /opt/files/config
> ENV KUBECONFIG=/opt/files/config
> ENTRYPOINT ["java","-Dlog4j.configurationFile=/opt/files/log4j2.xml", "-jar", 
> "/<jar-name>.jar"]
>
>
> Hope that helps,
>
> Tamir.
>
>
>
>
>
> ------------------------------
> *From:* Till Rohrmann <trohrm...@apache.org>
> *Sent:* Thursday, March 11, 2021 3:40 PM
> *To:* Tamir Sagi <tamir.s...@niceactimize.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Application cluster - Best Practice
>
>
> *EXTERNAL EMAIL*
>
>
> Hi Tamir,
>
> 1. How you start a cluster in application mode depends on how you deploy
> Flink. If you want to use Flink's standalone application mode, then you
> have to deploy a new cluster for every job you want to run [1, 2, 3]. What
> a standalone application cluster needs is the user code jar and a pointer
> which class to run. So if you want to deploy a standalone cluster on Docker
> or Kubernetes, then you either add the user code to your image, mount it as
> a volume or specify an init container (K8s case) to download the user code
> jar from somewhere.
>
> If you want to deploy on K8s and are able to use Flink's native
> integration (meaning that Flink can talk with the K8s cluster), you can
> also use the client to submit a new application cluster [4]. But also here,
> you have to make the user code available to your container in some way.
>
> 2. If you deploy your application cluster in standalone mode, then Flink
> won't automatically terminate the TaskManagers. This only works when using
> one of Flink's active deployments (K8s or Yarn). Hence, you either have to
> wait for the registration timeout of the TaskManager or stop them
> explicitly.
>
> 3. No, TaskManagers don't have to stay alive. The fact that they stayed
> alive is caused by the fact that you deployed Flink in standalone mode
> where Flink cannot start and stop TaskManagers.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#application-mode-on-docker
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#deploy-application-cluster
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/#application-mode
> [4]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#application-mode
>
> Cheers,
> Till
>
> On Sat, Mar 6, 2021 at 6:16 PM Tamir Sagi <tamir.s...@niceactimize.com>
> wrote:
>
> Hey All,
>
> I'm running an application cluster(Flink 1.11.1) on top of
> Kubernetes(currently running locally using docker-desktop). deployed via
> terraform module. (2 task managers)
>
> I followed the following instruction(well written)
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#deploy-application-cluster
>
> I have successfully executed the batch job. Kubernetes job(marked as
> completed)
>
>
> I was reading the following article
> https://flink.apache.org/news/2020/07/14/application-mode.html
> Apache Flink: Application Deployment in Flink: Current State and the new
> Application Mode
> <https://flink.apache.org/news/2020/07/14/application-mode.html>
> Application Deployment in Flink: Current State and the new Application
> Mode. 14 Jul 2020 Kostas Kloudas ()With the rise of stream processing and
> real-time analytics as a critical tool for modern businesses, an increasing
> number of organizations build platforms with Apache Flink at their core and
> offer it internally as a service.
> flink.apache.org
>
>
> I have several questions
>
>    1. What is the best practice to trigger jobs on the fly? in other
>    words, how to submit new jobs in runtime in application mode? (in session
>    cluster I could submit job via Flink-client).
>
>    2. Once the job is completed, I get the following message inside Task
>    manager
>
> *[2021-03-06T17:06:55,054][Info] {} [o.a.f.r.t.TaskExecutor]: Could not
>    resolve ResourceManager address
>    
> akka.tcp://flink@data-aggregation-flink-jobmanager:6123/user/rpc/resourcemanager_*,
>    retrying in 10000 ms: Could not connect to rpc endpoint under address
>    
> akka.tcp://flink@data-aggregation-flink-jobmanager:6123/user/rpc/resourcemanager_*.
>    *
>    it tries to register to resource manager until it's crashed with the
>    error:
>
> *[2021-03-06T17:10:47,437][Error] {} [o.a.f.r.t.TaskManagerRunner]: Fatal
>    error occurred while executing the TaskManager. Shutting it
>    
> down...org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
>    Could not register at the ResourceManager within the specified maximum
>    registration duration 300000 ms. This indicates a problem with this
>    instance. Terminating now. *
>    Is that a normal behavior?
>
>
>    3. In application mode, Does Task manager have to stay alive after the
>    job has been completed?
>
>
> Thanks,
> Tamir.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>

Reply via email to