Hi all,
I have one flink job which reads data from one kafka topic and sinks to two kafka topics using Flink SQL. The code is something like this: tableEnv.executeSql( """ create table sink_table1 ( xxx xxx ) with ( 'connector' = 'kafka', 'topic' = 'topic1' ) """ ) tableEnv.executeSql( """ create table sink_table2 ( xxx xxx ) with ( 'connector' = 'kafka', 'topic' = 'topic2' ) """ ) The code works well in local minicluster mode. But when I deploy it on yarn, one application is running well and the other would fail with the following error 022-01-24 11:16:25,374 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Could not start cluster entrypoint YarnJobClusterEntrypoint. org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569) [flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99) [flink-dist_2.11-1.12.2.jar:1.12.2] Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent. at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:234) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_161] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_161] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ~[hadoop-common-2.7.3.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175) ~[flink-dist_2.11-1.12.2.jar:1.12.2] ... 2 more Caused by: java.net.BindException: Could not start rest endpoint on any port in port range 42888 at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:234) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_161] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_161] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ~[hadoop-common-2.7.3.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175) ~[flink-dist_2.11-1.12.2.jar:1.12.2] ... 2 more It seems that the two sink tables would trigger two yarn applications and each application will start one job manager and two job managers are using the the same rest port on same container, so there's only one application can run successfully, the other would fail. Is there anything I can do to start both applications? Thanks Xuekui