I tried setting that option but did not work. 2020-02-07 19:28:45,999 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID 32fb9e7dcc9d41917bce38a2d5bb0093 (akka.tcp://flink@ip-1:34718/user/taskmanager_0) at ResourceManager 2020-02-07 19:28:46,425 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID 8c402a9c039d3c33466631510c48b552 (akka.tcp://flink@ip-2:37120/user/taskmanager_0) at ResourceManager
I have setting as follows taskmanager.rpc.port : 50100-50200 blob.server.port : 50201-50300 So how to control the port for TaskManager ? Inspite above setting the task managers are being scheduled at ports 34718 and 37120. Thanks, Milind On Thu, Feb 6, 2020 at 5:25 PM Yang Wang <danrtsey...@gmail.com> wrote: > Maybe you forget to limit the blob server port(blob.server.port) to the > range. > > > Best, > Yang > > Milind Vaidya <kava...@gmail.com> 于2020年2月7日周五 上午7:03写道: > >> I figured out that it was problem with the ports. 39493/34094 were not >> accessible. So to get this working I opened all the ports 0-65535 for the >> security group. >> >> How do I control that if I want to open only certain range of ports ? >> >> Is "taskmanager.rpc.port" the right parameter to set ? I did try and set >> this to certain port range, but did not work. >> >> Thanks >> Milind >> >> On Wed, Feb 5, 2020 at 11:22 AM Milind Vaidya <kava...@gmail.com> wrote: >> >>> >>> >>> >>> The cluster is set up on AWS with 1 Job manager and 2 task managers. >>> They all belong to same security group with 6123, 8081, 50100 - 50200 >>> ports having access granted >>> >>> Job manager config is as follows : >>> >>> FLINK_PLUGINS_DIR : >>> /usr/local/flink-1.9.1/plugins >>> io.tmp.dirs : /tmp/flink >>> jobmanager.execution.failover-strategy : region >>> jobmanager.heap.size : 1024m >>> jobmanager.rpc.address : 10.0.16.10 >>> jobmanager.rpc.port : 6123 >>> jobstore.cache-size : 52428800 >>> jobstore.expiration-time : 3600 >>> parallelism.default : 4 >>> slot.idle.timeout : 50000 >>> slot.request.timeout : 300000 >>> task.cancellation.interval : 30000 >>> task.cancellation.timeout : 180000 >>> task.cancellation.timers.timeout : 7500 >>> taskmanager.exit-on-fatal-akka-error : false >>> taskmanager.heap.size : 1024m >>> taskmanager.network.bind-policy : "ip" >>> taskmanager.numberOfTaskSlots : 2 >>> taskmanager.registration.initial-backoff: 500ms >>> taskmanager.registration.timeout : 5min >>> taskmanager.rpc.port : 50100-50200 >>> web.tmpdir : >>> /tmp/flink-web-74cce811-17c0-411e-9d11-6d91edd2e9b0 >>> >>> >>> >>> I have summarised the more details in a stack overflow question where it >>> is easier to put the various details. >>> >>> https://stackoverflow.com/questions/60082479/flink-1-9-standalone-cluster-failed-to-transfer-file-from-taskexecutor-id >>> >>> On Wed, Feb 5, 2020 at 2:25 AM Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hi, >>>> >>>> I don't think this is a bug. It looks like the machines can not talk to >>>> each other. Can you validate that all the machines can talk to each other >>>> on the ports used by Flink (6123, 8081, ...) >>>> If that doesn't help: >>>> - How is the network set up? >>>> - Are you running physical machines / VMs / containers? >>>> - Is there a firewall involved? >>>> >>>> Best, >>>> Robert >>>> >>>> >>>> On Fri, Jan 31, 2020 at 7:25 PM Milind Vaidya <kava...@gmail.com> >>>> wrote: >>>> >>>>> Hi >>>>> >>>>> I am trying to build a cluster for flink with 1 master and 2 workers. >>>>> The program is working fine locally. The messages are read from Kafka >>>>> and just printed on STDOUT. >>>>> >>>>> The cluster is successfully created and UI is also shows all config. >>>>> But the job fails to execute on the cluster. >>>>> >>>>> Here are few exceptions I see in the log files >>>>> >>>>> File : flink-root-standalonesession >>>>> >>>>> 2020-01-29 19:55:00,348 INFO akka.remote.transport.ProtocolStateActor >>>>> - No response from remote for outbound association. >>>>> Associate timed out after [20000 ms]. >>>>> 2020-01-29 19:55:00,350 INFO akka.remote.transport.ProtocolStateActor >>>>> - No response from remote for outbound association. >>>>> Associate timed out after [20000 ms]. >>>>> 2020-01-29 19:55:00,350 WARN akka.remote.ReliableDeliverySupervisor >>>>> - Association with remote system >>>>> [akka.tcp://flink-metrics@ip:39493] has failed, address is now gated >>>>> for [50] ms. Reason: [Association failed with >>>>> [akka.tcp://flink-metrics@ip:39493]] >>>>> Caused by: [No response >>>>> from remote for outbound association. Associate timed out after >>>>> [20000 ms].] >>>>> 2020-01-29 19:55:00,350 WARN akka.remote.ReliableDeliverySupervisor >>>>> - Association with remote system >>>>> [akka.tcp://flink-metrics@ip:34094] has failed, address is now gated >>>>> for [50] ms. Reason: [Association failed with >>>>> [akka.tcp://flink-metrics@ip:34094]] >>>>> Caused by: [No response f >>>>> rom remote for outbound association. Associate timed out after [20000 >>>>> ms].] >>>>> 2020-01-29 19:55:00,359 WARN >>>>> akka.remote.transport.netty.NettyTransport - Remote >>>>> connection to [null] failed with >>>>> org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: >>>>> connection timed out: /ip:39493 >>>>> 2020-01-29 19:55:00,359 WARN >>>>> akka.remote.transport.netty.NettyTransport - Remote >>>>> connection to [null] failed with >>>>> org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: >>>>> connection timed out: /ip:34094 >>>>> 2020-01-29 19:58:21,880 ERROR >>>>> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler >>>>> - Failed to transfer file from TaskExecutor >>>>> a7abe6e294fa3ae4129fd695f7309a36. >>>>> java.util.concurrent.CompletionException: >>>>> akka.pattern.AskTimeoutException: Ask timed out on >>>>> [Actor[akka://flink/user/resourcemanager#5385019]] after [10000 ms]. >>>>> Message of type >>>>> [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. >>>>> A typical reason for `AskTimeoutException` is that the recipient actor >>>>> didn't send a reply. >>>>> >>>>> >>>>> File : flink-root-client-ip >>>>> >>>>> >>>>> 2020-01-29 19:48:10,566 WARN org.apache.flink.client.cli.CliFrontend >>>>> - Could not load CLI class >>>>> org.apache.flink.yarn.cli.FlinkYarnSessionCli. >>>>> java.lang.NoClassDefFoundError: >>>>> org/apache/hadoop/yarn/exceptions/YarnException >>>>> at java.lang.Class.forName0(Native Method) >>>>> at java.lang.Class.forName(Class.java:264) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072) >>>>> Caused by: java.lang.ClassNotFoundException: >>>>> org.apache.hadoop.yarn.exceptions.YarnException >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>> at >>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>> ... 5 more >>>>> 2020-01-29 19:48:10,663 INFO org.apache.flink.core.fs.FileSystem >>>>> - Hadoop is not in the classpath/dependencies. The >>>>> extended set of supported File Systems via Hadoop is not available. >>>>> 2020-01-29 19:48:10,856 INFO >>>>> org.apache.flink.runtime.security.modules.HadoopModuleFactory - Cannot >>>>> create Hadoop Security Module because Hadoop cannot be found in the >>>>> Classpath. >>>>> 2020-01-29 19:48:10,874 INFO >>>>> org.apache.flink.runtime.security.SecurityUtils - Cannot >>>>> install HadoopSecurityContext because Hadoop cannot be found in the >>>>> Classpath. >>>>> 2020-01-29 19:48:10,875 INFO org.apache.flink.client.cli.CliFrontend >>>>> - Running 'run' command. >>>>> 2020-01-29 19:48:10,881 INFO org.apache.flink.client.cli.CliFrontend >>>>> - Building program from JAR file >>>>> 2020-01-29 19:48:10,965 INFO >>>>> org.apache.flink.configuration.Configuration - Config >>>>> uses fallback configuration key 'jobmanager.rpc.address' instead of key >>>>> 'rest.address' >>>>> 2020-01-29 19:48:11,160 INFO org.apache.flink.runtime.rest.RestClient >>>>> - Rest client endpoint started. >>>>> 2020-01-29 19:48:11,163 INFO org.apache.flink.client.cli.CliFrontend >>>>> - Starting execution of program >>>>> 2020-01-29 19:48:11,163 INFO >>>>> org.apache.flink.client.program.rest.RestClusterClient - Starting >>>>> program in interactive mode (detached: false) >>>>> 2020-01-29 19:48:11,306 INFO >>>>> org.apache.flink.configuration.GlobalConfiguration - Loading >>>>> configuration property: jobmanager.rpc.address, ip >>>>> 2020-01-29 19:48:11,306 INFO >>>>> org.apache.flink.configuration.GlobalConfiguration - Loading >>>>> configuration property: jobmanager.rpc.port, 6123 >>>>> 2020-01-29 19:48:11,307 INFO >>>>> org.apache.flink.configuration.GlobalConfiguration - Loading >>>>> configuration property: jobmanager.heap.size, 1024m >>>>> 2020-01-29 19:48:11,307 INFO >>>>> org.apache.flink.configuration.GlobalConfiguration - Loading >>>>> configuration property: taskmanager.heap.size, 1024m >>>>> 2020-01-29 19:48:11,307 INFO >>>>> org.apache.flink.configuration.GlobalConfiguration - Loading >>>>> configuration property: taskmanager.numberOfTaskSlots, 2 >>>>> 2020-01-29 19:48:11,307 INFO >>>>> org.apache.flink.configuration.GlobalConfiguration - Loading >>>>> configuration property: parallelism.default, 4 >>>>> 2020-01-29 19:48:11,307 INFO >>>>> org.apache.flink.configuration.GlobalConfiguration - Loading >>>>> configuration property: jobmanager.execution.failover-strategy, region >>>>> 2020-01-29 19:48:11,307 INFO >>>>> org.apache.flink.configuration.GlobalConfiguration - Loading >>>>> configuration property: io.tmp.dirs, /tmp/flink >>>>> 2020-01-29 19:48:11,311 INFO >>>>> org.apache.flink.client.program.rest.RestClusterClient - >>>>> Submitting >>>>> job 4f4cce35db3f37cae310f272ec88a303 (detached: false). >>>>> 2020-01-29 20:05:13,170 INFO org.apache.flink.runtime.rest.RestClient >>>>> - Shutting down rest endpoint. >>>>> 2020-01-29 20:05:13,172 INFO org.apache.flink.runtime.rest.RestClient >>>>> - Rest endpoint shutdown complete. >>>>> 2020-01-29 20:05:13,172 ERROR org.apache.flink.client.cli.CliFrontend >>>>> - Error while running the command. >>>>> org.apache.flink.client.program.ProgramInvocationException: Job >>>>> failed. (JobID: 4f4cce35db3f37cae310f272ec88a303) >>>>> at >>>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) >>>>> at >>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) >>>>> at >>>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60) >>>>> at >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>> at >>>>> com.saavn.flink.SongCountStreamingJob.main(SongCountStreamingJob.java:79) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >>>>> at >>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) >>>>> at >>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) >>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>>>> execution failed. >>>>> at >>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>>>> at >>>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259) >>>>> ... 18 more >>>>> Caused by: java.util.concurrent.TimeoutException: Heartbeat of >>>>> TaskManager with id a7abe6e294fa3ae4129fd695f7309a36 timed out. >>>>> at >>>>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149) >>>>> at >>>>> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109) >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>>>> at >>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>>>> at >>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>>>> at >>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>>> at >>>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>>>> >>>>> >>>>> Flink version : flink-1.9.1 >>>>> OS : CentOS Linux release 7.6.1810 (Core) >>>>> >>>>> Is this related to this issue : >>>>> https://issues.apache.org/jira/browse/FLINK-11143 >>>>> >>>>> Can somebody throw some light on this ? >>>>> >>>>> >>>>> >>>>>