Hello, I am a newbie to Beam.
Following the Beam docs, I am trying to submit the example WordCount to a Flink cluster (one jobmanger and one taskmanager running locally in two linked Docker containers with Maven installed). It seems the Beam doc is a bit confusing as to how to submit jobs. In https://beam.apache.org/get-started/quickstart-java/, it mentions I should use mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \ --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner where <flink master> seems just a hostname. In https://beam.apache.org/documentation/runners/flink/, it says I should use $ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Pflink-runner \ -Dexec.args="--runner=FlinkRunner \ --inputFile=/path/to/pom.xml \ --output=/path/to/counts \ --flinkMaster=<flink master url> \ --filesToStage=target/word-count-beam--bundled-0.1.jar" where I can give localhost:8081 for flinkMaster. I have tried run this command both outside Docker and in the jobmanager container (with exec command). Either way, if I use "localhost" without port for <flink master url>, it just runs locally and ignores flink cluster. If I use "localhost:8081", the command hangs for about 5 seconds and shows the following error messages. It eventually disconnects and dies. Could you help give some hint how the Beam jobs are submitted to Flink cluster in general? Can I do it outside jobmanager node remotely? Thanks in advance! -------------------------------------- Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run INFO: Executing pipeline using FlinkRunner. Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run INFO: Translating pipeline to Flink program. Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkExecutionEnvironments createBatchExecutionEnvironment INFO: Creating a Batch Execution Environment. Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator enterCompositeTransform INFO: enterCompositeTransform- Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator enterCompositeTransform INFO: | enterCompositeTransform- ReadLines Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator visitPrimitiveTransform INFO: | | visitPrimitiveTransform- ReadLines/Read Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator leaveCompositeTransform INFO: | leaveCompositeTransform- ReadLines Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator enterCompositeTransform INFO: | enterCompositeTransform- WordCount.CountWords ... INFO: | | leaveCompositeTransform- WriteCounts/WriteFiles Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator leaveCompositeTransform INFO: | leaveCompositeTransform- WriteCounts Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator leaveCompositeTransform INFO: leaveCompositeTransform- Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkRunner run INFO: Starting execution of Flink program. Jul 14, 2018 12:49:49 AM org.apache.flink.api.java.ExecutionEnvironment createProgramPlan INFO: The job has 0 registered types and 0 default Kryo serializers Jul 14, 2018 12:49:49 AM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes INFO: Filepattern pom.xml matched 1 files with total size 10600 Jul 14, 2018 12:49:49 AM org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get INFO: Starting client actor system. Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils findConnectingAddress INFO: Trying to select the network interface and address to use by connecting to the leading JobManager. Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils findConnectingAddress INFO: TaskManager will try to connect for 10000 milliseconds before falling back to heuristics Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener findConnectingAddress INFO: Retrieved new target address localhost/127.0.0.1:8081. Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem INFO: Trying to start actor system at c4342d15c3f4:0 Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 applyOrElse INFO: Slf4jLogger started Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp INFO: Starting remoting Jul 14, 2018 12:49:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp INFO: Remoting started; listening on addresses :[akka.tcp://flink@c4342d15c3f4:46627] Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem INFO: Actor system started at akka.tcp://flink@c4342d15c3f4:46627 Jul 14, 2018 12:49:51 AM org.apache.flink.client.program.ClusterClient logAndSysout INFO: Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for job completion. Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for job completion. Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor handleMessage INFO: Received SubmitJobAndWait(JobGraph(jobId: 87a12b5471d39a7837d6b0def6d748e2)) but there is no connection to a JobManager yet. Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage INFO: Received job wordcount-root-0714004948-3ee44b3d (87a12b5471d39a7837d6b0def6d748e2). Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor disconnectFromJobManager INFO: Disconnect from JobManager null. Jul 14, 2018 12:49:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 apply$mcV$sp WARNING: Remote connection to [localhost/127.0.0.1:8081] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 1213486164 - discarded Jul 14, 2018 12:49:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 apply$mcV$sp WARNING: Association with remote system [akka.tcp://flink@localhost:8081] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@localhost:8081]] Caused by: [The remote system explicitly disassociated (reason unknown).] ... Jul 14, 2018 12:50:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp INFO: Remoting shut down. Jul 14, 2018 12:50:51 AM org.apache.beam.runners.flink.FlinkRunner run SEVERE: Pipeline execution failed org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185) ...
