I have created a ticket FLINK-21807[1] to track this requirement. [1]. https://issues.apache.org/jira/browse/FLINK-21807
Best, Yang Tamir Sagi <tamir.s...@niceactimize.com> 于2021年3月16日周二 上午1:11写道: > Hey Yang, > > The operator gave me a good lead by "revealing" that Application Deployer > does exist and there is a way to do what I was looking for > programmatically. > > Just a quick note, the operator uses the same Flink methods internally to > deploy the application cluster but also provide a way to submit jobs via > CRDs(which is not a good approach for our needs). > > IMHO I think it's important to add such Application Deployer example into > the documents. > > Best, > Tamir. > ------------------------------ > *From:* Yang Wang <danrtsey...@gmail.com> > *Sent:* Monday, March 15, 2021 11:29 AM > *To:* Tamir Sagi <tamir.s...@niceactimize.com> > *Cc:* Till Rohrmann <trohrm...@apache.org>; user@flink.apache.org < > user@flink.apache.org> > *Subject:* Re: Application cluster - Best Practice > > > *EXTERNAL EMAIL* > > > Hi Tamir, > > Thanks for sharing the information. > > I think you are right. If you dive into the code implementation > of flink-native-k8s-operator[1], they are just same. > > @Till Rohrmann <trohrm...@apache.org> Do you think it is reasonable to > make "ApplicationDeployer" interface as public? Then I believe > it will be great easier to integrate with deployer systems. > > > [1]. https://github.com/wangyang0918/flink-native-k8s-operator > > Best, > Yang > > > Tamir Sagi <tamir.s...@niceactimize.com> 于2021年3月12日周五 上午3:56写道: > > Hey Till, > > You are right. > > I'm new to Flink, I was looking for a Java way to deploy an application > cluster. I first tried the standalone approach and changed to native (although > the official documents specify that application mode is more suitable for > production , they show only the CLI way). > > I encountered the k8s operator (Which provided me a good direction) but > was unnecessary Since we don't want to deal with custom resources and > manage K8S resources ourselves. > > Hence, as I showed, I wrote a small program that deploys an application > cluster(Using Flink client & Flink Kubernetes client - which uses the > fabric8io-kubernetes-client > <https://github.com/fabric8io/kubernetes-client> under the hood) for > others who will encounter that post. > > Cheers mate, > > Tamir. > ------------------------------ > *From:* Till Rohrmann <trohrm...@apache.org> > *Sent:* Thursday, March 11, 2021 7:13 PM > *To:* Tamir Sagi <tamir.s...@niceactimize.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Application cluster - Best Practice > > > *EXTERNAL EMAIL* > > > What the Flink client does for you when starting an application mode > cluster in native K8s mode is to generate the K8s job specification and to > submit it to the K8s cluster. Hence, you can also do it yourself if you > prefer to manage the K8s resources directly. Writing your own client should > also work but is of course more work. > > Cheers, > Till > > On Thu, Mar 11, 2021 at 3:49 PM Tamir Sagi <tamir.s...@niceactimize.com> > wrote: > > Hey Till, > > Thank you for responding. > > I've already read the link you send , but they are not enough , they don't > provide a good solution for production. > > Standalone-Kubernetes is not a good approach for production for 3 main > reasons(In my opinion): > > - TMs are defined as deployment which means they stay up and running > even after the job manager(k8s job) is completed > - Using these configurations , Flink does not aware of the > standalone Kubernetes, which means you have to clean the resources > yourself. > - IMHO, dealing with yaml files in production is a bad practice > , Deploying application cluster via yaml files in runtime is not a good > approach. (how exactly? terraform, Kubernetes client, kubectl?) > > > Native k8s *is the right* approach since it terminates all dynamic > resources, however the documentation shows only a deployment via Flink CLI, > which again not a good practice in production > > Another solution is to use Kubernetes Operation(i.e > https://github.com/wangyang0918/flink-native-k8s-operator) ,however, the > operator expects CRDs, which defined by yaml file + triggering new Flink > app is done by yaml files. > > > > *Unfortunately, the documentation doesn't elaborate much about the Flink > Client. * > After digging and debugging the operation lib above, I found out that > Flink client can deploy an application cluster. > > Here is a simple Program that creates an *application cluster* * with > dynamic resources allocation *(Using Flink Client & Flink Kubernetes > client) in Java > > *Dependencies (*i.e flink.version=1.12.1, scala.binary.version=2.12) > > > > > > > > > > > > > > > > > > > > > > > > > > * <dependency> <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> <version>${flink.version}</version> > <scope>provided</scope></dependency><dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.12</artifactId> > <version>${flink.version}</version></dependency><dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-kubernetes_${scala.binary.version}</artifactId> > <version>${flink.version}</version></dependency><dependency> > <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> > <version>${flink.version}</version></dependency> Java Code:* > > @Log4j2 > public class TestFlinkClientToAppCluster { > > public static void main(String[] args) throws InterruptedException { > log.info("Client started"); > // Deploy application > final ClusterClientServiceLoader clusterClientServiceLoader = new > DefaultClusterClientServiceLoader(); > final ApplicationDeployer deployer = new > ApplicationClusterDeployer(clusterClientServiceLoader); > final ApplicationConfiguration applicationConfiguration = > new ApplicationConfiguration(new String[]{},null); > try { > deployer.run(getEffectiveConfig(), applicationConfiguration); > log.info("Finished"); > } catch (Exception e) { > log.error("Failed to deploy cluster {}", "flink-test", e); > } > } > > Where the getEffectiveConfig() returns a Configuration Object. represents > a flink-conf.yaml file with all necessary parameters > > For example: > > Configuration effectiveConfig = new Configuration(); //you can load from > file: GlobalConfiguration.loadConfiguration(<path>); > URI uri = new URI("local://path/to/artifcat"); > > effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, "default"); > > effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, "flink-test"); > > effectiveConfig.set(DeploymentOptions.TARGET, "kubernetes-application"); > effectiveConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, > KubernetesConfigOptions.ServiceExposedType.ClusterIP); > effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, > "app-cluster-job-manager:1.0.1"); > effectiveConfig.set(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH, > "/docker-entrypoint.sh"); > effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList > (uri.toString())); > > if specific configurations for TM or JM are needed, then you can configure > them like that > > effectiveConfig.setString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), > "1024m"); > effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1); > effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, 500.0); > effectiveConfig.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), > "2048m"); > effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_CPU, 500.0); > > > If the application runs outside K8S it might throw an error indicating the > pod failed to find the REST endpoint (it looks for k8s format > <cluster-name>-rest.<namespace>) - but the cluster will be deployed anyway. > > I deployed the application on k8s (docker-desktop) > > - Make sure you provide the kube config . since Flink Kubernetes > client is running within k8s it needs credentials to communicate with API > server and create dynamic resources. - otherwise, the following exception > will be raised: Java: > sun.security.provider.certpath.SunCertPathBuilderException: unable to find > valid certification path to requested target > > Docker file for the program above (config is the local Kubernetes > credentials file located in ~/.kube/config) I copied it to the project dir. > > FROM openjdk:11-jdk-slim-buster > COPY target/<jar-name>.jar / > RUN mkdir -p /opt/files > ADD log4j2.xml /opt/files/log4j.xml > ADD config /opt/files/config > ENV KUBECONFIG=/opt/files/config > ENTRYPOINT ["java","-Dlog4j.configurationFile=/opt/files/log4j2.xml", "-jar", > "/<jar-name>.jar"] > > > Hope that helps, > > Tamir. > > > > > > ------------------------------ > *From:* Till Rohrmann <trohrm...@apache.org> > *Sent:* Thursday, March 11, 2021 3:40 PM > *To:* Tamir Sagi <tamir.s...@niceactimize.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Application cluster - Best Practice > > > *EXTERNAL EMAIL* > > > Hi Tamir, > > 1. How you start a cluster in application mode depends on how you deploy > Flink. If you want to use Flink's standalone application mode, then you > have to deploy a new cluster for every job you want to run [1, 2, 3]. What > a standalone application cluster needs is the user code jar and a pointer > which class to run. So if you want to deploy a standalone cluster on Docker > or Kubernetes, then you either add the user code to your image, mount it as > a volume or specify an init container (K8s case) to download the user code > jar from somewhere. > > If you want to deploy on K8s and are able to use Flink's native > integration (meaning that Flink can talk with the K8s cluster), you can > also use the client to submit a new application cluster [4]. But also here, > you have to make the user code available to your container in some way. > > 2. If you deploy your application cluster in standalone mode, then Flink > won't automatically terminate the TaskManagers. This only works when using > one of Flink's active deployments (K8s or Yarn). Hence, you either have to > wait for the registration timeout of the TaskManager or stop them > explicitly. > > 3. No, TaskManagers don't have to stay alive. The fact that they stayed > alive is caused by the fact that you deployed Flink in standalone mode > where Flink cannot start and stop TaskManagers. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#application-mode-on-docker > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#deploy-application-cluster > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/#application-mode > [4] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#application-mode > > Cheers, > Till > > On Sat, Mar 6, 2021 at 6:16 PM Tamir Sagi <tamir.s...@niceactimize.com> > wrote: > > Hey All, > > I'm running an application cluster(Flink 1.11.1) on top of > Kubernetes(currently running locally using docker-desktop). deployed via > terraform module. (2 task managers) > > I followed the following instruction(well written) > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#deploy-application-cluster > > I have successfully executed the batch job. Kubernetes job(marked as > completed) > > > I was reading the following article > https://flink.apache.org/news/2020/07/14/application-mode.html > Apache Flink: Application Deployment in Flink: Current State and the new > Application Mode > <https://flink.apache.org/news/2020/07/14/application-mode.html> > Application Deployment in Flink: Current State and the new Application > Mode. 14 Jul 2020 Kostas Kloudas ()With the rise of stream processing and > real-time analytics as a critical tool for modern businesses, an increasing > number of organizations build platforms with Apache Flink at their core and > offer it internally as a service. > flink.apache.org > > > I have several questions > > 1. What is the best practice to trigger jobs on the fly? in other > words, how to submit new jobs in runtime in application mode? (in session > cluster I could submit job via Flink-client). > > 2. Once the job is completed, I get the following message inside Task > manager > > *[2021-03-06T17:06:55,054][Info] {} [o.a.f.r.t.TaskExecutor]: Could not > resolve ResourceManager address > > akka.tcp://flink@data-aggregation-flink-jobmanager:6123/user/rpc/resourcemanager_*, > retrying in 10000 ms: Could not connect to rpc endpoint under address > > akka.tcp://flink@data-aggregation-flink-jobmanager:6123/user/rpc/resourcemanager_*. > * > it tries to register to resource manager until it's crashed with the > error: > > *[2021-03-06T17:10:47,437][Error] {} [o.a.f.r.t.TaskManagerRunner]: Fatal > error occurred while executing the TaskManager. Shutting it > > down...org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: > Could not register at the ResourceManager within the specified maximum > registration duration 300000 ms. This indicates a problem with this > instance. Terminating now. * > Is that a normal behavior? > > > 3. In application mode, Does Task manager have to stay alive after the > job has been completed? > > > Thanks, > Tamir. > > > Confidentiality: This communication and any attachments are intended for > the above-named persons only and may be confidential and/or legally > privileged. Any opinions expressed in this communication are not > necessarily those of NICE Actimize. If this communication has come to you > in error you must take no action based on it, nor must you copy or show it > to anyone; please delete/destroy and inform the sender by e-mail > immediately. > Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. > Viruses: Although we have taken steps toward ensuring that this e-mail and > attachments are free from any virus, we advise that in keeping with good > computing practice the recipient should ensure they are actually virus free. > > > Confidentiality: This communication and any attachments are intended for > the above-named persons only and may be confidential and/or legally > privileged. Any opinions expressed in this communication are not > necessarily those of NICE Actimize. If this communication has come to you > in error you must take no action based on it, nor must you copy or show it > to anyone; please delete/destroy and inform the sender by e-mail > immediately. > Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. > Viruses: Although we have taken steps toward ensuring that this e-mail and > attachments are free from any virus, we advise that in keeping with good > computing practice the recipient should ensure they are actually virus free. > > > Confidentiality: This communication and any attachments are intended for > the above-named persons only and may be confidential and/or legally > privileged. Any opinions expressed in this communication are not > necessarily those of NICE Actimize. If this communication has come to you > in error you must take no action based on it, nor must you copy or show it > to anyone; please delete/destroy and inform the sender by e-mail > immediately. > Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. > Viruses: Although we have taken steps toward ensuring that this e-mail and > attachments are free from any virus, we advise that in keeping with good > computing practice the recipient should ensure they are actually virus free. > > > Confidentiality: This communication and any attachments are intended for > the above-named persons only and may be confidential and/or legally > privileged. Any opinions expressed in this communication are not > necessarily those of NICE Actimize. If this communication has come to you > in error you must take no action based on it, nor must you copy or show it > to anyone; please delete/destroy and inform the sender by e-mail > immediately. > Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. > Viruses: Although we have taken steps toward ensuring that this e-mail and > attachments are free from any virus, we advise that in keeping with good > computing practice the recipient should ensure they are actually virus free. >