Hi Ilan, can you please check number of threads on the task-managers / OS?
As far as I remember this happens when system can not create any more
threads (there is a system wide limit */proc/sys/kernel/threads-max* [1]).
Please not that the limit might be exhausted by other processes.

[1] https://man7.org/linux/man-pages/man5/proc.5.html

D.

On Sun, Dec 12, 2021 at 2:53 PM Ilan Huchansky <ilan.huchan...@start.io>
wrote:

> Hi David,
>
>
>
> Sorry for the previous mail, sent it before it was finished, please ignore.
>
>
>
> We made the changes, now submitting the jobs using flink CLI.
>
> To be more specific –
>
> We use a docker, with a flink image containing flink CLI. We submit the
> jobs with the run command specifying the job manager we want to submit to.
> We call this the submitter.
>                 As explained in previous mails, job managers and task
> managers run over docker on separated machines (each machine has 1 task
> manger and 1 job manager) and also separated from the submitter.
>
>
>
> Unfortunately, we are still seeing the same error.
>
> Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
>
> The behavior is the following:
>
> One task manager crashes, from that point submitting new jobs fail with
> the following error:
>
> Caused by: java.io.IOException: Could not connect to BlobServer at address
>
> Then we see the native thread error on another task manager.
>
>
>
> The cluster is up without running jobs until we restart the task / job
> managers.
>
>
>
> Our blob related configuration:
>
>    - blob.server.port: 6124
>    - blob.fetch.num-concurrent: 300
>    - blob.fetch.retries: 20
>    - blob.service.cleanup.interval: 10800
>
>
>
> full stack trace of the submitting error:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute job 'job_name'.\n", b'\tat
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)\n',
>
>
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)\n',
>
>
>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n',
>
>
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)\n',
>
>
>
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)\n',
>
>
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)\n',
>
>
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)\n',
>
>
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)\n',
>
>
>
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)\n',
> b"Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'job_name'.\n",
>
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)\n',
>
>
>
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)\n',
>
>
>
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n',
>
>
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)\n',
>
>
>
> com.startapp.consumer.KafkaStreaming.main(KafkaStreaming.java:84)\n',
>
>                 sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)\n',
>
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n',
>
>
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n',
>
>
>                 java.lang.reflect.Method.invoke(Method.java:498)\n',
>
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)\n',
>
>
> \t... 8 more\n',
>
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.\n',
>
>
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)\n',
>
>
>
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n',
>
>
>
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n',
>
>
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
>
>
>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)\n',
>
>
>
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)\n',
>
>
>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n',
>
>
>
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n',
>
>
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
>
>
>
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)\n',
>
>
>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)\n',
>
>
>
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n',
>
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
>
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
>
>
>                 java.lang.Thread.run(Thread.java:748)\n',
>
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not
> upload job files.\n',
>
>
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)\n',
>
>
>
> java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)\n',
>
>
>
> java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)\n',
>
>
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
>
>
>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n',
>
>
>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n',
>
>
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n',
>
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n',
>
>
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n',
>
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
>
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
>
>
>                 java.lang.Thread.run(Thread.java:748)\n',
>
> Caused by: org.apache.flink.util.FlinkException: Could not upload job
> files.\n',
>
>
> org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)\n',
>
>
>
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:195)\n',
>
>
> \t... 11 more\n',
>
> Caused by: java.io.IOException: Could not connect to BlobServer at address
> DOMAIN/IP:PORT\n',
>
>
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:102)\n',
>
>
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:199)\n',
>
>
>
> org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:82)\n',
>
>
> \t... 12 more\n',
>
> Caused by: java.net.ConnectException: Connection refused (Connection
> refused)\n',
>
>                 java.net.PlainSocketImpl.socketConnect(Native Method)\n',
>
>
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)\n',
>
>
>
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)\n',
>
>
>
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)\n',
>
>
>
> java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n',
>
>                 java.net.Socket.connect(Socket.java:607)\n',
>
>
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:96)\n',
>
> \t... 14 more\n',
>
> ]\n',
>
>
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:486)\n',
>
>
>
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)\n',
>
>
>
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)\n',
>
>
>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)\n',
>
>
> \t... 4 more\n']
>
>
>
> Could the blob error lead to the native thread error?
>
> Is there any changes on the configuration that we can make to avoid the
> blob related errors?
>
> Can the current submission method that we are using trigger the native
> thread error?
>
>
>
>
> Thanks,
> Ilan.
>
>
>
>
>
> *From: *Ilan Huchansky <ilan.huchan...@start.io>
> *Date: *Tuesday, 7 December 2021 at 11:22
> *To: *David Morávek <d...@apache.org>
> *Cc: *user@flink.apache.org <user@flink.apache.org>, Start.io SDP <
> s...@start.io>
> *Subject: *Re: Unable to create new native thread error
>
> Hi David,
>
>
>
> In that case, I will start working on using the CLI instead of the REST
> API right away.
>
>
>
> Will update you when I finish.
>
>
>
> Thanks for the help,
>
> Ilan.
>
>
>
>
>
> *From: *David Morávek <d...@apache.org>
> *Date: *Monday, 6 December 2021 at 10:34
> *To: *Ilan Huchansky <ilan.huchan...@start.io>
> *Cc: *user@flink.apache.org <user@flink.apache.org>, Start.io SDP <
> s...@start.io>
> *Subject: *Re: Unable to create new native thread error
>
> Hi Ilan,
>
>
>
> I think so, using CLI instead of REST API should solve this, as the user
> code execution would be pulled out to a separate JVM. If you're going to
> try that, it would be great to hear back whether it has solved your issue.
>
>
>
> As for 1.13.4, there is currently no on-going effort / concrete plan on
> the release.
>
>
>
> Best,
>
> D.
>
>
>
> On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky <ilan.huchan...@start.io>
> wrote:
>
> Hi David,
>
>
>
> Thanks for your fast response.
>
>
>
> Do you think that changing the submission method could solve the problem?
> Using the CLI instead of the REST API.
>
>
>
> Another question, I see that the most critical issue (FLINK-25022) is in
> progress and should be released on with version 1.13.4 , do you know when
> this version is planned to be released?
>
>
>
> Thanks again,
>
> Ilan.
>
>
>
> *From: *David Morávek <d...@apache.org>
> *Date: *Thursday, 2 December 2021 at 17:25
> *To: *Ilan Huchansky <ilan.huchan...@start.io>
> *Cc: *user@flink.apache.org <user@flink.apache.org>, Start.io SDP <
> s...@start.io>
> *Subject: *Re: Unable to create new native thread error
>
> Hi Ilan,
>
>
>
> we are aware of multiple issues when web-submission can result in
> classloader / thread local leaks, which could potentially result in the
> behavior you're describing. We're working on addressing them.
>
>
>
> FLINK-25022 [1]: The most critical one leaking thread locals.
> FLINK-25027 [2]: Is only a memory improvement for a particular situation
> (a lot of small batch jobs) and could be fixed by accounting for when
> setting Metaspace size.
> FLINK-25023 [3]: Can leak the classloader of the first job submitted via
> rest API. (constant overhead for Metaspace)
>
>
>
> In general, web-submission is different from a normal submission in way,
> that the "main method" of the uploaded jar is executed on JobManager and
> it's really hard to isolate it's execution from possible side effects.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-25022
>
> [2] https://issues.apache.org/jira/browse/FLINK-25027
>
> [3] https://issues.apache.org/jira/browse/FLINK-25023
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky <ilan.huchan...@start.io>
> wrote:
>
> *Hi Flink mailing list,*
>
>
>
> I am Ilan from Start.io data platform team, need some guidance.
>
>
>
> We have a flow with the following use case:
>
>
>
>    - We read files from AWS S3 buckets process them on our cluster and
>    sink the data into files using Flink file sink.
>    - The jobs use always the same jar, we uploaded it to every job
>    manager on the cluster.
>    - We are submitting jobs constantly through the REST API.
>    - Each job reads one or more files from S3.
>    - The jobs can run from 20 seconds up to 3.5 hours.
>    - The jobs run on batch mode
>    - Running flink 1.13.1
>    - We are running in cluster mode using docker, same machines are being
>    used for task and job manager.
>
>
>
>  We are struggling with the same error, over and over again. We encounter
> it in the job manager and in the task manager.
>
>
>
> After a while that the cluster is running and jobs are finishing correctly
> the task and job manager fail to operate due to:
>
> Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
>
>
>
>
>
> We also see some sporadic failure of java.lang.NoClassDefFoundError, not
> sure it is related.
>
>
>
> Our set up and configuration are as follow:
>
> ·         5 nodes cluster running on docker
>
> ·         Relevant memory config:
>
> jobmanager.memory.heap.size: 1600m
>
> taskmanager.memory.process.size: 231664m
>
> taskmanager.memory.network.fraction: 0.3
>
> taskmanager.memory.jvm-metaspace.size: 10g
>
> jobmanager.memory.jvm-metaspace.size: 2g
>
> taskmanager.memory.framework.off-heap.size: 1g
>
>
>
> ·         Host details
>
> max locked memory  (kbytes, -l) 65536
>
> max memory size       (kbytes, -m) unlimited
>
> open files                     (-n) 1024
>
> max user processes    (-u) 1547269
>
> virtual memory           (kbytes, -v) unlimited
>
> file locks                       (-x) unlimited
>
>
>
> cat /proc/sys/kernel/threads-max: 3094538
>
> kernel.pid_max = 57344
>
>
>
>
>
> We try to increase the max user processes, also to increase and decrease
> the jvm-metaspace.
>
>
>
> Should we keep increasing the max number of processes on the host, Is
> there a way to limit the number of threads from flink config?
>
>
>
> What should we do? Any insights?
> I can provide more information as needed.
>
>
>
> Thanks in advance
>
>
>
>  Ilan
>
>
>
>

Reply via email to