Hi all,

I have one flink job which reads data from one kafka topic and sinks to two 
kafka topics using Flink SQL.


The code is something like this:


tableEnv.executeSql(
"""
create table sink_table1 (
xxx
xxx
) with (
    'connector' = 'kafka',
    'topic' = 'topic1'
)
"""
)


tableEnv.executeSql(
"""
create table sink_table2 (
xxx
xxx
) with (
    'connector' = 'kafka',
     'topic' = 'topic2'
)
"""
)



The code works well in local minicluster mode. 
But when I deploy it on yarn, one application is running well and the other 
would fail with the following error


022-01-24 11:16:25,374 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint      
  [] - Could not start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnJobClusterEntrypoint.
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
 [flink-dist_2.11-1.12.2.jar:1.12.2]
        at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
 [flink-dist_2.11-1.12.2.jar:1.12.2]
Caused by: org.apache.flink.util.FlinkException: Could not create the 
DispatcherResourceManagerComponent.
        at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:234)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
        at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_161]
        at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_161]
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
 ~[hadoop-common-2.7.3.jar:?]
        at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
        ... 2 more
Caused by: java.net.BindException: Could not start rest endpoint on any port in 
port range 42888
        at 
org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
        at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:234)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
        at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_161]
        at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_161]
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
 ~[hadoop-common-2.7.3.jar:?]
        at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
        at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
        ... 2 more



It seems that the two sink tables would trigger two yarn applications and each 
application will start one job manager and two job managers are using the the 
same rest port on same container, so there's only one application can run 
successfully, the other would fail.


Is there anything I can do to start both applications?


Thanks
Xuekui

Reply via email to