Hi Ilan, can you please check number of threads on the task-managers / OS? As far as I remember this happens when system can not create any more threads (there is a system wide limit */proc/sys/kernel/threads-max* [1]). Please not that the limit might be exhausted by other processes.
[1] https://man7.org/linux/man-pages/man5/proc.5.html D. On Sun, Dec 12, 2021 at 2:53 PM Ilan Huchansky <ilan.huchan...@start.io> wrote: > Hi David, > > > > Sorry for the previous mail, sent it before it was finished, please ignore. > > > > We made the changes, now submitting the jobs using flink CLI. > > To be more specific – > > We use a docker, with a flink image containing flink CLI. We submit the > jobs with the run command specifying the job manager we want to submit to. > We call this the submitter. > As explained in previous mails, job managers and task > managers run over docker on separated machines (each machine has 1 task > manger and 1 job manager) and also separated from the submitter. > > > > Unfortunately, we are still seeing the same error. > > Caused by: java.lang.OutOfMemoryError: unable to create new native thread. > > The behavior is the following: > > One task manager crashes, from that point submitting new jobs fail with > the following error: > > Caused by: java.io.IOException: Could not connect to BlobServer at address > > Then we see the native thread error on another task manager. > > > > The cluster is up without running jobs until we restart the task / job > managers. > > > > Our blob related configuration: > > - blob.server.port: 6124 > - blob.fetch.num-concurrent: 300 > - blob.fetch.retries: 20 > - blob.service.cleanup.interval: 10800 > > > > full stack trace of the submitting error: > > > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: Failed to execute job 'job_name'.\n", b'\tat > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)\n', > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)\n', > > > > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n', > > > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)\n', > > > > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)\n', > > > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)\n', > > > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)\n', > > > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)\n', > > > > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)\n', > b"Caused by: org.apache.flink.util.FlinkException: Failed to execute job > 'job_name'.\n", > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)\n', > > > > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)\n', > > > > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n', > > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)\n', > > > > com.startapp.consumer.KafkaStreaming.main(KafkaStreaming.java:84)\n', > > sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method)\n', > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n', > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n', > > > java.lang.reflect.Method.invoke(Method.java:498)\n', > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)\n', > > > \t... 8 more\n', > > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed > to submit JobGraph.\n', > > > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)\n', > > > > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n', > > > > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n', > > > > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n', > > > > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)\n', > > > > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)\n', > > > > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n', > > > > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n', > > > > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n', > > > > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)\n', > > > > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)\n', > > > > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n', > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n', > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n', > > > java.lang.Thread.run(Thread.java:748)\n', > > Caused by: org.apache.flink.runtime.rest.util.RestClientException: > [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not > upload job files.\n', > > > org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)\n', > > > > java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)\n', > > > > java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)\n', > > > > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n', > > > > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n', > > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n', > > > java.util.concurrent.FutureTask.run(FutureTask.java:266)\n', > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n', > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n', > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n', > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n', > > > java.lang.Thread.run(Thread.java:748)\n', > > Caused by: org.apache.flink.util.FlinkException: Could not upload job > files.\n', > > > org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)\n', > > > > org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:195)\n', > > > \t... 11 more\n', > > Caused by: java.io.IOException: Could not connect to BlobServer at address > DOMAIN/IP:PORT\n', > > > org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:102)\n', > > > org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:199)\n', > > > > org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:82)\n', > > > \t... 12 more\n', > > Caused by: java.net.ConnectException: Connection refused (Connection > refused)\n', > > java.net.PlainSocketImpl.socketConnect(Native Method)\n', > > > java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)\n', > > > > java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)\n', > > > > java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)\n', > > > > java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n', > > java.net.Socket.connect(Socket.java:607)\n', > > > org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:96)\n', > > \t... 14 more\n', > > ]\n', > > > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:486)\n', > > > > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)\n', > > > > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)\n', > > > > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)\n', > > > \t... 4 more\n'] > > > > Could the blob error lead to the native thread error? > > Is there any changes on the configuration that we can make to avoid the > blob related errors? > > Can the current submission method that we are using trigger the native > thread error? > > > > > Thanks, > Ilan. > > > > > > *From: *Ilan Huchansky <ilan.huchan...@start.io> > *Date: *Tuesday, 7 December 2021 at 11:22 > *To: *David Morávek <d...@apache.org> > *Cc: *user@flink.apache.org <user@flink.apache.org>, Start.io SDP < > s...@start.io> > *Subject: *Re: Unable to create new native thread error > > Hi David, > > > > In that case, I will start working on using the CLI instead of the REST > API right away. > > > > Will update you when I finish. > > > > Thanks for the help, > > Ilan. > > > > > > *From: *David Morávek <d...@apache.org> > *Date: *Monday, 6 December 2021 at 10:34 > *To: *Ilan Huchansky <ilan.huchan...@start.io> > *Cc: *user@flink.apache.org <user@flink.apache.org>, Start.io SDP < > s...@start.io> > *Subject: *Re: Unable to create new native thread error > > Hi Ilan, > > > > I think so, using CLI instead of REST API should solve this, as the user > code execution would be pulled out to a separate JVM. If you're going to > try that, it would be great to hear back whether it has solved your issue. > > > > As for 1.13.4, there is currently no on-going effort / concrete plan on > the release. > > > > Best, > > D. > > > > On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky <ilan.huchan...@start.io> > wrote: > > Hi David, > > > > Thanks for your fast response. > > > > Do you think that changing the submission method could solve the problem? > Using the CLI instead of the REST API. > > > > Another question, I see that the most critical issue (FLINK-25022) is in > progress and should be released on with version 1.13.4 , do you know when > this version is planned to be released? > > > > Thanks again, > > Ilan. > > > > *From: *David Morávek <d...@apache.org> > *Date: *Thursday, 2 December 2021 at 17:25 > *To: *Ilan Huchansky <ilan.huchan...@start.io> > *Cc: *user@flink.apache.org <user@flink.apache.org>, Start.io SDP < > s...@start.io> > *Subject: *Re: Unable to create new native thread error > > Hi Ilan, > > > > we are aware of multiple issues when web-submission can result in > classloader / thread local leaks, which could potentially result in the > behavior you're describing. We're working on addressing them. > > > > FLINK-25022 [1]: The most critical one leaking thread locals. > FLINK-25027 [2]: Is only a memory improvement for a particular situation > (a lot of small batch jobs) and could be fixed by accounting for when > setting Metaspace size. > FLINK-25023 [3]: Can leak the classloader of the first job submitted via > rest API. (constant overhead for Metaspace) > > > > In general, web-submission is different from a normal submission in way, > that the "main method" of the uploaded jar is executed on JobManager and > it's really hard to isolate it's execution from possible side effects. > > > > [1] https://issues.apache.org/jira/browse/FLINK-25022 > > [2] https://issues.apache.org/jira/browse/FLINK-25027 > > [3] https://issues.apache.org/jira/browse/FLINK-25023 > > > > Best, > > D. > > > > On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky <ilan.huchan...@start.io> > wrote: > > *Hi Flink mailing list,* > > > > I am Ilan from Start.io data platform team, need some guidance. > > > > We have a flow with the following use case: > > > > - We read files from AWS S3 buckets process them on our cluster and > sink the data into files using Flink file sink. > - The jobs use always the same jar, we uploaded it to every job > manager on the cluster. > - We are submitting jobs constantly through the REST API. > - Each job reads one or more files from S3. > - The jobs can run from 20 seconds up to 3.5 hours. > - The jobs run on batch mode > - Running flink 1.13.1 > - We are running in cluster mode using docker, same machines are being > used for task and job manager. > > > > We are struggling with the same error, over and over again. We encounter > it in the job manager and in the task manager. > > > > After a while that the cluster is running and jobs are finishing correctly > the task and job manager fail to operate due to: > > Caused by: java.lang.OutOfMemoryError: unable to create new native thread. > > > > > > We also see some sporadic failure of java.lang.NoClassDefFoundError, not > sure it is related. > > > > Our set up and configuration are as follow: > > · 5 nodes cluster running on docker > > · Relevant memory config: > > jobmanager.memory.heap.size: 1600m > > taskmanager.memory.process.size: 231664m > > taskmanager.memory.network.fraction: 0.3 > > taskmanager.memory.jvm-metaspace.size: 10g > > jobmanager.memory.jvm-metaspace.size: 2g > > taskmanager.memory.framework.off-heap.size: 1g > > > > · Host details > > max locked memory (kbytes, -l) 65536 > > max memory size (kbytes, -m) unlimited > > open files (-n) 1024 > > max user processes (-u) 1547269 > > virtual memory (kbytes, -v) unlimited > > file locks (-x) unlimited > > > > cat /proc/sys/kernel/threads-max: 3094538 > > kernel.pid_max = 57344 > > > > > > We try to increase the max user processes, also to increase and decrease > the jvm-metaspace. > > > > Should we keep increasing the max number of processes on the host, Is > there a way to limit the number of threads from flink config? > > > > What should we do? Any insights? > I can provide more information as needed. > > > > Thanks in advance > > > > Ilan > > > >