Hey All, I know Application cluster is not widely used yet, I'm happy to be part of Flink community , test it and share the results.
Following my previous email, I'd like to share more information and get your feedback. Scenario 4 : requestJobResult() gets out of sync. The result is very similar to Scenario #3, here I delved into it and understood the reason. As I mentioned before, once the Cluster is up and running, we create RestCluster , fetch the listJobs() and query each one of them for jobResult using client.requestJobResult(jobId) requestJobResult(jobId) will re-schedule requests unless an exception is thrown or the job is completed. https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L651-L681 The polling interval is defined in ExponentialWaitStrategy https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L171 As for the RestClusterClient class it's initialized with initalTime = 10L, maxWait=2000L The interval is returned from "sleep" method https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L674 https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/rest/retry/ExponentialWaitStrategy.java#L42-L48 which means that after 8 requests the interval will always be 2 seconds. 1st Request will be sent after 10ms 2nd Request will be sent after 20ms 3rd Request will be sent after 40ms 4th Request will be sent after 80ms 5th Request will be sent after 160ms 6th Request will be sent after 320ms 7th Request will be sent after 640ms 8th Request will be sent after 1280ms 9th Request will be sent after 2000ms 10th Request will be sent after 2000ms 11th Request will be sent after 2000ms . . . and so on For example: [2021-04-28T13:44:25,030][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}. [2021-04-28T13:44:25,045][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result [2021-04-28T13:44:25,107][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}. [2021-04-28T13:44:25,128][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result [2021-04-28T13:44:25,144][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}. [2021-04-28T13:44:25,185][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result [2021-04-28T13:44:25,213][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}. [2021-04-28T13:44:25,294][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result [2021-04-28T13:44:25,302][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}. [2021-04-28T13:44:25,465][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result [2021-04-28T13:44:25,517][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}. [2021-04-28T13:44:25,838][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result [2021-04-28T13:44:25,904][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}. [2021-04-28T13:44:26,545][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result [2021-04-28T13:44:26,924][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}. [2021-04-28T13:44:28,206][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result [2021-04-28T13:44:28,241][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}. [2021-04-28T13:44:30,243][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result [2021-04-28T13:44:30,257][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}. [2021-04-28T13:44:32,258][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result The first few requests are sent way too fast; the job is still in Progress. The issue is that the job is completed and the cluster gets destroyed between the polling intervals(2s). which means that on the next request the cluster does not longer exist, the client cannot get its status and the following exception is eventually raised : java.net.UnknownHostException within the CompleteableFuture Questions: * Is it a good practice to use RestCluster for Application Cluster? - One possible solution is to send a message from the job itself upon completion/failure. (i.e to Kafka topic). it will redundant the JobResult polling. * If the RestCluster is suitable also for Application Clusters(by design), Why the polling interval is not configurable? Maybe we could solve it by smaller polling interval. Another question is related to the exception JobInitializationException I see the stacktrace and the exception in RestClient as a response, but it's not reflected , Neither failure nor exception . Logs Sample [2021-04-28T14:08:41,456][Error] {} [o.a.f.r.r.RestClient]: Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor an error. Response=JsonResponse{json={"status":{"id":"COMPLETED"},"job-execution-result":{"id":"4e56d506ca892a3109e48e6a5b804330","application-status":"FAILED","accumulator-results":{},"net-runtime":15540,"failure-cause":{"class":"org.apache.flink.runtime.client.JobInitializationException","stack-trace":"org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\t at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (TextOutputFormat (s3://xsight-aggregated-data/6376f1b0-b406-463b-bb52-2e55c68ec9ec/86979f6c-d023-43a1-a6fc-55202e696e54/aggregations) - UTF-8)': doesBucketExist on xsight-aggregated-data: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.AmazonServiceException: Internal Server Error (Service: null; Status Code: 500; Error Code: null; Request ID: null; Proxy: null): No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.AmazonServiceException: Internal Server Error (Service: null; Status Code: 500; Error Code: null; Request ID: null; Proxy: null)\n\t at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:239) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478) ... 4 more\nCaused by: org.apache.hadoop.fs.s3a.AWSClientIOException: doesBucketExist on xsight-aggregated-data: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.AmazonServiceException: Internal Server Error (Service: null; Status Code: 500; Error Code: null; Request ID: null; Proxy: null): No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.AmazonServiceException: Internal Server Error (Service: null; Status Code: 500; Error Code: null; Request ID: null; Proxy: null)\n\t at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)\n\t at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)\n\t at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)\n\t at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) Thank you, Tamir. [https://my-email-signature.link/signature.gif?u=1088647&e=150053333&v=16b79902c684f9ae5e211df6fc84dce6e1729b6ef82dd44cf2e2daa2ca3e211b] ________________________________ From: Tamir Sagi <tamir.s...@niceactimize.com> Sent: Sunday, April 25, 2021 10:07 PM To: Yang Wang <danrtsey...@gmail.com> Cc: user@flink.apache.org <user@flink.apache.org> Subject: [SUSPECTED FRAUD]Re: Application cluster - Job execution and cluster creation timeouts Hey Yang, Community As been discussed few weeks ago, I'm working on Application Cluster - Native K8s approach, running Flink 1.12.2. We deploy application clusters programmatically which works well. In addition, we leverage Kubernetes client(Fabric8io) to watch the deployment/pods status and get an indication whether the k8s cluster is up and running. Job Details We read file from S3 using hadoop-s3-plugin (https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html#hadooppresto-s3-file-systems-plugins) Process the data and write it back to S3. We fetch the job list using ClusterClient.( cluent.listJobs() ) No savepoints/backend state are configured. I would like to raise several questions regarding some scenarios I encountered and would like to get your feedback. These scenarios showed Flink Application cluster(native k8s) behavior in case of failures. Scenario 1: Exception prior env.execute() gets called. I deploy application cluster and an exception is thrown prior env.execute() gets called. Result: Received exception, nothing gets cleaned - Job Manager pod is still running despite no jobs are running. Question: How should we get the "Real" Cluster status, Job manager pod is running but the execution has never occurred. (Memory leak ? ) Scenario 2 : Application state Error and no running jobs. env.execute gets called and an exception is thrown prior the Job starts. I did not provide AWS credential and exception com.amazonaws.AmazonClientException was thrown. which led to "Caused by: org.apache.flink.runtime.client.JobExecutionException" error. the Application state was failed but the list jobs was empty(they never started) according to the document(https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/client/program/ClusterClient.html#listJobs--) Lists the currently running and finished jobs on the cluster. I still see debug logs where Flink is aware of that exception and clean all the resources. [2021-04-25T13:59:59,569][Info] {} [o.a.f.r.e.ClusterEntrypoint]: Shutting KubernetesApplicationClusterEntrypoint down with application status FAILED. Diagnostics null. [2021-04-25T13:59:59,569][Info] {} [o.a.f.r.j.MiniDispatcherRestEndpoint]: Shutting down rest endpoint. [2021-04-25T13:59:59,767][Info] {} [o.a.f.r.r.a.ActiveResourceManager]: Shut down cluster because application is in FAILED, diagnostics null. [2021-04-25T13:59:59,768][Info] {} [o.a.f.k.KubernetesResourceManagerDriver]: Deregistering Flink Kubernetes cluster, clusterId: test-flink-app-9645, diagnostics: Result: cluster gets destroyed, listJobs is empty until client gets "UnknownHost" Exception. (Cluster no longer exists) Question: How can we get the application state outside the cluster? or catch JobExecutionException ? Scenario 3: Job starts and throws an exception, Job Status remains in progress Once the job is executed its status changed to In Progress, list jobs are retrieved(within few seconds) and for each job we query job status via "clusterClient.requestJobResult(jobId)", however once the job failed the result never changes to "Failed" but the ComplteableFuture get an exception due to max number of retries. Code snippet try { CompletableFuture<JobResult> jobResultFuture = client.requestJobResult(flinkJobId); jobResultFuture.thenAccept(jobResult -> handleJobResult(jobResult)) .exceptionally(throwable -> { handleJobResultWithException(flinkJobId, Optional.of(throwable)); return null; }); } catch (Exception e) { handleGetJobResultWithException(flinkJobId, Optional.of(e)); } Stacktrace Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386) ~[flink-runtime_2.12-1.12.1.jar!/:1.12.1] ... 33 more Caused by: java.util.concurrent.CompletionException: java.net.UnknownHostException: test-flink-app-24569-rest.testing at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?] ... 31 more Caused by: java.net.UnknownHostException: test-flink-app-24569-rest.testing at java.net.InetAddress$CachedAddresses.get(Unknown Source) ~[?:?] Result: the resources get cleaned, then the future can no longer get the cluster status. we always end up in the exceptionally clause. Question: Why the job result is not changed to failed? what am I missing? Highly appreciate your help. Tamir. [https://my-email-signature.link/signature.gif?u=1088647&e=149335852&v=7e311f531897d939c60e716a2c02f6006f0b61fa9f7067ebaecda04a21c95656] ________________________________ From: Yang Wang <danrtsey...@gmail.com> Sent: Wednesday, April 7, 2021 6:24 AM To: Tamir Sagi <tamir.s...@niceactimize.com> Cc: user@flink.apache.org <user@flink.apache.org> Subject: Re: Application cluster - Job execution and cluster creation timeouts EXTERNAL EMAIL Hi Tamir, Maybe I did not make myself clear. Here the "deployer" means our internal Flink application deployer(actually it is ververica platform), not the ApplicationDeployer interface in Flink. It helps with managing the lifecycle of every Flink application. And it has the same native K8s integration mechanism with you have mentioned. In my opinion, cleaning up the infinite failover Flink application(e.g. wrong image) is the responsibility of your own deployer, not the Flink client. In such a case, the JobManager usually could not run normally. However, if the JobManager could be started successfully. Then it will clean up all the K8s resources once all the jobs reached to the terminal status(e.g. FAILED, CANCELED, FINISHED). Even the JobManager crashed, it could recover the jobs from latest checkpoint successfully if HA[1] enabled. [1]. https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/overview/ Best, Yang Tamir Sagi <tamir.s...@niceactimize.com<mailto:tamir.s...@niceactimize.com>> 于2021年4月6日周二 下午6:43写道: Hey Yang Thank you for your respond We run the application cluster programmatically. I discussed about it here with an example how to run it from java and not CLI. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Application-cluster-Best-Practice-td42011.html following your comment accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to create a new one. I have not seen it in action actually, I gave a non-existing image . The deployer actually started the k8s deployment but pods failed to start(expected) , The k8s deployment was running infinite. What configuration is that ? is it possible to override it ? I delved into the Flink-Core, and Flink-Kubernetes jars, Since Flink is dependent on Kubernetes , we both need to leverage the Kubernetes client(which Flink does internally) to manage and inspecting the resources. I am curious why you have "infinite job execution" in your Flink application cluster. If all the jobs in the application finished, Flink will deregister the application and all the K8s resources should be cleaned up. My though was about what happens if there is a bug and the job running infinite, job manager crashes over and over again? What happens if resources don't get cleaned properly ? We don't want to keep the cluster up and running in that case and would like to get a feedback. Since Flink does not support that we have to inspect that externally.(which makes it more complex) We could also pull the job status using Flink client, but it become useless if the job is executed infinite. What do you think? Best, Tamir. [https://my-email-signature.link/signature.gif?u=1088647&e=145530340&v=5500b7f1f0cbfd289d5f3053790ae0e36932941ce59f5ce3694a2ae0a6341dcd] ________________________________ From: Yang Wang <danrtsey...@gmail.com<mailto:danrtsey...@gmail.com>> Sent: Tuesday, April 6, 2021 10:36 AM To: Tamir Sagi <tamir.s...@niceactimize.com<mailto:tamir.s...@niceactimize.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Application cluster - Job execution and cluster creation timeouts EXTERNAL EMAIL Hi Tamir, Thanks for trying the native K8s integration. 1. We do not have a timeout for creating the Flink application cluster. The reason is that the job submission happens on the JobManager side. So the Flink client does not need to wait for the JobManager running and then exit. I think even though the Flink client internally has the timeout, we still have the same problem when the Flink client crashes and then the timeout is gone. I want to share some other solution about the timeout. In our deployer, when a new Flink application is created, the deployer will periodically check the accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to create a new one. 2. Actually, the current "flink run-application" does not support the real attached mode(waiting for all the jobs in the application finished). I am curious why you have "infinite job execution" in your Flink application cluster. If all the jobs in the application finished, Flink will deregister the application and all the K8s resources should be cleaned up. Best, Yang Tamir Sagi <tamir.s...@niceactimize.com<mailto:tamir.s...@niceactimize.com>> 于2021年4月5日周一 下午11:24写道: Hey all, We deploy application cluster natively on Kubernetes. are there any timeouts for Job execution and cluster creation? I went over the configuration page here<https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html> but did not find anything relevant. In order to get an indication about the cluster , we leverage the k8s client<https://github.com/fabric8io/kubernetes-client/blob/master/doc/CHEATSHEET.md#pods> to watch the deployment<https://github.com/fabric8io/kubernetes-client/blob/master/doc/CHEATSHEET.md#deployment%23:~:text=Watching%20a%20Deployment%3A> in a namespace with specific cluster name and respond accordingly. we define two timeouts 1. Creating the application cluster (i.e. to date if there are errors in pods, the k8s deployment is up but the application cluster is not running.) 2. Until the application cluster resources get cleaned(upon completion) - which prevent an infinite job execution or k8s glitches However, this solution is not ideal because in case this client lib crashes, the timeouts are gone. We don't want to manage these timeouts states ourselves. Any suggestion or better way? Thanks, Tamir. [https://my-email-signature.link/signature.gif?u=1088647&e=145346582&v=3f32b726c93b8d93869d4a1520a346f1c12902a66bd38eb48abc091003335147] Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free. Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free. Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free. Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.