Hi David, We made the changes, now submitting the jobs using flink CLI. To be more specific
Unfortunately, we are still seeing the same error. Caused by: java.lang.OutOfMemoryError: unable to create new native thread. The behavior is the following: One task manager crashes, from that point submitting new jobs fail with the following error: Caused by: java.io.IOException: Could not connect to BlobServer at address Then we saw the native thread error on another task manager. The cluster is up without running jobs until we restart some task / job managers. Our blob related configuration: * blob.server.port: 6124 * blob.fetch.num-concurrent: 300 * blob.fetch.retries: 20 * blob.service.cleanup.interval: 10800 full stack trace of the submitting error: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'job_name'.\n", b'\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)\n', org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)\n', org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n', org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)\n', org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)\n', org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)\n', org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)\n', org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)\n', org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)\n', b"Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'job_name'.\n", org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)\n', org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)\n', org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n', org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)\n', com.startapp.consumer.KafkaStreaming.main(KafkaStreaming.java:84)\n', sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n', sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n', sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n', java.lang.reflect.Method.invoke(Method.java:498)\n', org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)\n', \t... 8 more\n', Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.\n', org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)\n', java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n', java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n', java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n', java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)\n', org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)\n', java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n', java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n', java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n', java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)\n', java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)\n', java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n', java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n', java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n', java.lang.Thread.run(Thread.java:748)\n', Caused by: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload job files.\n', org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)\n', java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)\n', java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)\n', java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n', java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n', java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n', java.util.concurrent.FutureTask.run(FutureTask.java:266)\n', java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n', java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n', java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n', java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n', java.lang.Thread.run(Thread.java:748)\n', Caused by: org.apache.flink.util.FlinkException: Could not upload job files.\n', org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)\n', org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:195)\n', \t... 11 more\n', Caused by: java.io.IOException: Could not connect to BlobServer at address DOMAIN/IP:PORT\n', org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:102)\n', org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:199)\n', org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:82)\n', \t... 12 more\n', Caused by: java.net.ConnectException: Connection refused (Connection refused)\n', java.net.PlainSocketImpl.socketConnect(Native Method)\n', java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)\n', java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)\n', java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)\n', java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n', java.net.Socket.connect(Socket.java:607)\n', org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:96)\n', \t... 14 more\n', ]\n', org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:486)\n', org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)\n', java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)\n', java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)\n', \t... 4 more\n'] [cidimage001.png@01D79447.4139E0E0]<https://www.start.io/> Ilan Huchansky ● Big data developer M 972-54-5200110 Explore our audiences<https://www.start.io/audience/> From: Ilan Huchansky <ilan.huchan...@start.io> Date: Tuesday, 7 December 2021 at 11:22 To: David Morávek <d...@apache.org> Cc: user@flink.apache.org <user@flink.apache.org>, Start.io SDP <s...@start.io> Subject: Re: Unable to create new native thread error Hi David, In that case, I will start working on using the CLI instead of the REST API right away. Will update you when I finish. Thanks for the help, Ilan. From: David Morávek <d...@apache.org> Date: Monday, 6 December 2021 at 10:34 To: Ilan Huchansky <ilan.huchan...@start.io> Cc: user@flink.apache.org <user@flink.apache.org>, Start.io SDP <s...@start.io> Subject: Re: Unable to create new native thread error Hi Ilan, I think so, using CLI instead of REST API should solve this, as the user code execution would be pulled out to a separate JVM. If you're going to try that, it would be great to hear back whether it has solved your issue. As for 1.13.4, there is currently no on-going effort / concrete plan on the release. Best, D. On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky <ilan.huchan...@start.io<mailto:ilan.huchan...@start.io>> wrote: Hi David, Thanks for your fast response. Do you think that changing the submission method could solve the problem? Using the CLI instead of the REST API. Another question, I see that the most critical issue (FLINK-25022) is in progress and should be released on with version 1.13.4 , do you know when this version is planned to be released? Thanks again, Ilan. From: David Morávek <d...@apache.org<mailto:d...@apache.org>> Date: Thursday, 2 December 2021 at 17:25 To: Ilan Huchansky <ilan.huchan...@start.io<mailto:ilan.huchan...@start.io>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> <user@flink.apache.org<mailto:user@flink.apache.org>>, Start.io SDP <s...@start.io<mailto:s...@start.io>> Subject: Re: Unable to create new native thread error Hi Ilan, we are aware of multiple issues when web-submission can result in classloader / thread local leaks, which could potentially result in the behavior you're describing. We're working on addressing them. FLINK-25022 [1]: The most critical one leaking thread locals. FLINK-25027 [2]: Is only a memory improvement for a particular situation (a lot of small batch jobs) and could be fixed by accounting for when setting Metaspace size. FLINK-25023 [3]: Can leak the classloader of the first job submitted via rest API. (constant overhead for Metaspace) In general, web-submission is different from a normal submission in way, that the "main method" of the uploaded jar is executed on JobManager and it's really hard to isolate it's execution from possible side effects. [1] https://issues.apache.org/jira/browse/FLINK-25022 [2] https://issues.apache.org/jira/browse/FLINK-25027 [3] https://issues.apache.org/jira/browse/FLINK-25023 Best, D. On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky <ilan.huchan...@start.io<mailto:ilan.huchan...@start.io>> wrote: Hi Flink mailing list, I am Ilan from Start.io data platform team, need some guidance. We have a flow with the following use case: * We read files from AWS S3 buckets process them on our cluster and sink the data into files using Flink file sink. * The jobs use always the same jar, we uploaded it to every job manager on the cluster. * We are submitting jobs constantly through the REST API. * Each job reads one or more files from S3. * The jobs can run from 20 seconds up to 3.5 hours. * The jobs run on batch mode * Running flink 1.13.1 * We are running in cluster mode using docker, same machines are being used for task and job manager. We are struggling with the same error, over and over again. We encounter it in the job manager and in the task manager. After a while that the cluster is running and jobs are finishing correctly the task and job manager fail to operate due to: Caused by: java.lang.OutOfMemoryError: unable to create new native thread. We also see some sporadic failure of java.lang.NoClassDefFoundError, not sure it is related. Our set up and configuration are as follow: * 5 nodes cluster running on docker * Relevant memory config: jobmanager.memory.heap.size: 1600m taskmanager.memory.process.size: 231664m taskmanager.memory.network.fraction: 0.3 taskmanager.memory.jvm-metaspace.size: 10g jobmanager.memory.jvm-metaspace.size: 2g taskmanager.memory.framework.off-heap.size: 1g * Host details max locked memory (kbytes, -l) 65536 max memory size (kbytes, -m) unlimited open files (-n) 1024 max user processes (-u) 1547269 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited cat /proc/sys/kernel/threads-max: 3094538 kernel.pid_max = 57344 We try to increase the max user processes, also to increase and decrease the jvm-metaspace. Should we keep increasing the max number of processes on the host, Is there a way to limit the number of threads from flink config? What should we do? Any insights? I can provide more information as needed. Thanks in advance Ilan