I figured out that it was problem with the ports. 39493/34094 were not accessible. So to get this working I opened all the ports 0-65535 for the security group.
How do I control that if I want to open only certain range of ports ? Is "taskmanager.rpc.port" the right parameter to set ? I did try and set this to certain port range, but did not work. Thanks Milind On Wed, Feb 5, 2020 at 11:22 AM Milind Vaidya <kava...@gmail.com> wrote: > > > > The cluster is set up on AWS with 1 Job manager and 2 task managers. > They all belong to same security group with 6123, 8081, 50100 - 50200 > ports having access granted > > Job manager config is as follows : > > FLINK_PLUGINS_DIR : /usr/local/flink-1.9.1/plugins > io.tmp.dirs : /tmp/flink > jobmanager.execution.failover-strategy : region > jobmanager.heap.size : 1024m > jobmanager.rpc.address : 10.0.16.10 > jobmanager.rpc.port : 6123 > jobstore.cache-size : 52428800 > jobstore.expiration-time : 3600 > parallelism.default : 4 > slot.idle.timeout : 50000 > slot.request.timeout : 300000 > task.cancellation.interval : 30000 > task.cancellation.timeout : 180000 > task.cancellation.timers.timeout : 7500 > taskmanager.exit-on-fatal-akka-error : false > taskmanager.heap.size : 1024m > taskmanager.network.bind-policy : "ip" > taskmanager.numberOfTaskSlots : 2 > taskmanager.registration.initial-backoff: 500ms > taskmanager.registration.timeout : 5min > taskmanager.rpc.port : 50100-50200 > web.tmpdir : > /tmp/flink-web-74cce811-17c0-411e-9d11-6d91edd2e9b0 > > > > I have summarised the more details in a stack overflow question where it > is easier to put the various details. > > https://stackoverflow.com/questions/60082479/flink-1-9-standalone-cluster-failed-to-transfer-file-from-taskexecutor-id > > On Wed, Feb 5, 2020 at 2:25 AM Robert Metzger <rmetz...@apache.org> wrote: > >> Hi, >> >> I don't think this is a bug. It looks like the machines can not talk to >> each other. Can you validate that all the machines can talk to each other >> on the ports used by Flink (6123, 8081, ...) >> If that doesn't help: >> - How is the network set up? >> - Are you running physical machines / VMs / containers? >> - Is there a firewall involved? >> >> Best, >> Robert >> >> >> On Fri, Jan 31, 2020 at 7:25 PM Milind Vaidya <kava...@gmail.com> wrote: >> >>> Hi >>> >>> I am trying to build a cluster for flink with 1 master and 2 workers. >>> The program is working fine locally. The messages are read from Kafka >>> and just printed on STDOUT. >>> >>> The cluster is successfully created and UI is also shows all config. But >>> the job fails to execute on the cluster. >>> >>> Here are few exceptions I see in the log files >>> >>> File : flink-root-standalonesession >>> >>> 2020-01-29 19:55:00,348 INFO akka.remote.transport.ProtocolStateActor >>> - No response from remote for outbound association. >>> Associate timed out after [20000 ms]. >>> 2020-01-29 19:55:00,350 INFO akka.remote.transport.ProtocolStateActor >>> - No response from remote for outbound association. >>> Associate timed out after [20000 ms]. >>> 2020-01-29 19:55:00,350 WARN akka.remote.ReliableDeliverySupervisor >>> - Association with remote system >>> [akka.tcp://flink-metrics@ip:39493] has failed, address is now gated >>> for [50] ms. Reason: [Association failed with >>> [akka.tcp://flink-metrics@ip:39493]] >>> Caused by: [No response >>> from remote for outbound association. Associate timed out after [20000 >>> ms].] >>> 2020-01-29 19:55:00,350 WARN akka.remote.ReliableDeliverySupervisor >>> - Association with remote system >>> [akka.tcp://flink-metrics@ip:34094] has failed, address is now gated >>> for [50] ms. Reason: [Association failed with >>> [akka.tcp://flink-metrics@ip:34094]] >>> Caused by: [No response f >>> rom remote for outbound association. Associate timed out after [20000 >>> ms].] >>> 2020-01-29 19:55:00,359 WARN akka.remote.transport.netty.NettyTransport >>> - Remote connection to [null] failed with >>> org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: >>> connection timed out: /ip:39493 >>> 2020-01-29 19:55:00,359 WARN akka.remote.transport.netty.NettyTransport >>> - Remote connection to [null] failed with >>> org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: >>> connection timed out: /ip:34094 >>> 2020-01-29 19:58:21,880 ERROR >>> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler >>> - Failed to transfer file from TaskExecutor >>> a7abe6e294fa3ae4129fd695f7309a36. >>> java.util.concurrent.CompletionException: >>> akka.pattern.AskTimeoutException: Ask timed out on >>> [Actor[akka://flink/user/resourcemanager#5385019]] after [10000 ms]. >>> Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. >>> A typical reason for `AskTimeoutException` is that the recipient actor >>> didn't send a reply. >>> >>> >>> File : flink-root-client-ip >>> >>> >>> 2020-01-29 19:48:10,566 WARN org.apache.flink.client.cli.CliFrontend >>> - Could not load CLI class >>> org.apache.flink.yarn.cli.FlinkYarnSessionCli. >>> java.lang.NoClassDefFoundError: >>> org/apache/hadoop/yarn/exceptions/YarnException >>> at java.lang.Class.forName0(Native Method) >>> at java.lang.Class.forName(Class.java:264) >>> at >>> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187) >>> at >>> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147) >>> at >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072) >>> Caused by: java.lang.ClassNotFoundException: >>> org.apache.hadoop.yarn.exceptions.YarnException >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>> ... 5 more >>> 2020-01-29 19:48:10,663 INFO org.apache.flink.core.fs.FileSystem >>> - Hadoop is not in the classpath/dependencies. The >>> extended set of supported File Systems via Hadoop is not available. >>> 2020-01-29 19:48:10,856 INFO >>> org.apache.flink.runtime.security.modules.HadoopModuleFactory - Cannot >>> create Hadoop Security Module because Hadoop cannot be found in the >>> Classpath. >>> 2020-01-29 19:48:10,874 INFO >>> org.apache.flink.runtime.security.SecurityUtils - Cannot >>> install HadoopSecurityContext because Hadoop cannot be found in the >>> Classpath. >>> 2020-01-29 19:48:10,875 INFO org.apache.flink.client.cli.CliFrontend >>> - Running 'run' command. >>> 2020-01-29 19:48:10,881 INFO org.apache.flink.client.cli.CliFrontend >>> - Building program from JAR file >>> 2020-01-29 19:48:10,965 INFO >>> org.apache.flink.configuration.Configuration - Config >>> uses fallback configuration key 'jobmanager.rpc.address' instead of key >>> 'rest.address' >>> 2020-01-29 19:48:11,160 INFO org.apache.flink.runtime.rest.RestClient >>> - Rest client endpoint started. >>> 2020-01-29 19:48:11,163 INFO org.apache.flink.client.cli.CliFrontend >>> - Starting execution of program >>> 2020-01-29 19:48:11,163 INFO >>> org.apache.flink.client.program.rest.RestClusterClient - Starting >>> program in interactive mode (detached: false) >>> 2020-01-29 19:48:11,306 INFO >>> org.apache.flink.configuration.GlobalConfiguration - Loading >>> configuration property: jobmanager.rpc.address, ip >>> 2020-01-29 19:48:11,306 INFO >>> org.apache.flink.configuration.GlobalConfiguration - Loading >>> configuration property: jobmanager.rpc.port, 6123 >>> 2020-01-29 19:48:11,307 INFO >>> org.apache.flink.configuration.GlobalConfiguration - Loading >>> configuration property: jobmanager.heap.size, 1024m >>> 2020-01-29 19:48:11,307 INFO >>> org.apache.flink.configuration.GlobalConfiguration - Loading >>> configuration property: taskmanager.heap.size, 1024m >>> 2020-01-29 19:48:11,307 INFO >>> org.apache.flink.configuration.GlobalConfiguration - Loading >>> configuration property: taskmanager.numberOfTaskSlots, 2 >>> 2020-01-29 19:48:11,307 INFO >>> org.apache.flink.configuration.GlobalConfiguration - Loading >>> configuration property: parallelism.default, 4 >>> 2020-01-29 19:48:11,307 INFO >>> org.apache.flink.configuration.GlobalConfiguration - Loading >>> configuration property: jobmanager.execution.failover-strategy, region >>> 2020-01-29 19:48:11,307 INFO >>> org.apache.flink.configuration.GlobalConfiguration - Loading >>> configuration property: io.tmp.dirs, /tmp/flink >>> 2020-01-29 19:48:11,311 INFO >>> org.apache.flink.client.program.rest.RestClusterClient - Submitting >>> job 4f4cce35db3f37cae310f272ec88a303 (detached: false). >>> 2020-01-29 20:05:13,170 INFO org.apache.flink.runtime.rest.RestClient >>> - Shutting down rest endpoint. >>> 2020-01-29 20:05:13,172 INFO org.apache.flink.runtime.rest.RestClient >>> - Rest endpoint shutdown complete. >>> 2020-01-29 20:05:13,172 ERROR org.apache.flink.client.cli.CliFrontend >>> - Error while running the command. >>> org.apache.flink.client.program.ProgramInvocationException: Job failed. >>> (JobID: 4f4cce35db3f37cae310f272ec88a303) >>> at >>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) >>> at >>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>> at >>> com.saavn.flink.SongCountStreamingJob.main(SongCountStreamingJob.java:79) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) >>> at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) >>> at >>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) >>> at >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) >>> at >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) >>> at >>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>> at >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>> execution failed. >>> at >>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>> at >>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259) >>> ... 18 more >>> Caused by: java.util.concurrent.TimeoutException: Heartbeat of >>> TaskManager with id a7abe6e294fa3ae4129fd695f7309a36 timed out. >>> at >>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149) >>> at >>> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) >>> at >>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>> at >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>> at >>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>> >>> >>> Flink version : flink-1.9.1 >>> OS : CentOS Linux release 7.6.1810 (Core) >>> >>> Is this related to this issue : >>> https://issues.apache.org/jira/browse/FLINK-11143 >>> >>> Can somebody throw some light on this ? >>> >>> >>> >>>