[ https://issues.apache.org/jira/browse/FLINK-11484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16758848#comment-16758848 ]
Yang Wang commented on FLINK-11484: ----------------------------------- Blink share the slots by default, you could run the following command to verify that 43 tasks will be running in 40 slots. {code:java} ./bin/flink run -m yarn-cluster -ys 4 -yn 10 -ytm 5120 -p 40 ./examples/streaming/WindowJoin.jar {code} This is just because quantitative resource management is disabled and the total slots are enough under slot sharing. When quantitative resource management is enabled, the tasks may not be allocated because of insufficient resources(CpuCores/UserHeap/UserDirect/UserNative/ManagedMem/NetworkMem). For example, if each operator allocate 1 core, we will need 43 cpu cores and the flink session cluster only have 40 cores, so the job could not be running because of insufficient cpu cores. > Blink java.util.concurrent.TimeoutException > ------------------------------------------- > > Key: FLINK-11484 > URL: https://issues.apache.org/jira/browse/FLINK-11484 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.5.5 > Environment: The link of blink source code: > [github.com/apache/flink/tree/blink|https://github.com/apache/flink/tree/blink] > Reporter: pj > Priority: Major > Labels: blink > Attachments: 1.png > > > *If I run blink application on yarn and the parallelism number larger than 1.* > *Following is the command :* > ./flink run -m yarn-cluster -ynm FLINK_NG_ENGINE_1 -ys 4 -yn 10 -ytm 5120 -p > 40 -c XXMain ~/xx.jar > *Following is the code:* > {{DataStream outputStream = tableEnv.toAppendStream(curTable, Row.class); > outputStream.print();}} > *{{The whole subtask of application will hang a long time and finally the > }}{{toAppendStream()}}{{ function will throw an exception like below:}}* > {{org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: f5e4f7243d06035202e8fa250c364304) at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:276) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:482) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeInternal(StreamContextEnvironment.java:85) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeInternal(StreamContextEnvironment.java:37) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1893) > at com.ngengine.main.KafkaMergeMain.startApp(KafkaMergeMain.java:352) at > com.ngengine.main.KafkaMergeMain.main(KafkaMergeMain.java:94) at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:561) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:445) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:786) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1029) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1105) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1105) > Caused by: java.util.concurrent.TimeoutException at > org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745)}}{{}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)