Alice, Totally agree with what Lukasz said. Also, as alternative solution for job testing, I can suggest to install Flink locally and run Beam pipeline with just CLI command like this “bin/flink run -c <main_class> /path/to/jar --runner=FlinkRunner “.
> On 18 Jul 2018, at 17:09, Lukasz Cwik <[email protected]> wrote: > > Yes, you should be able to submit jobs to a Flink master from anywhere you > have network connectivity to the Flink master. > > It looks like your job is being submitted to the Flink master and we start > waiting for the job to complete but something is causing the job to not > complete successfully. Have you tried looking at the Flink master UI or Flink > master logs? > > > > On Fri, Jul 13, 2018 at 6:45 PM Alice Wong <[email protected] > <mailto:[email protected]>> wrote: > Hello, > > I am a newbie to Beam. > > Following the Beam docs, I am trying to submit the example WordCount to a > Flink cluster (one jobmanger and one taskmanager running locally in two > linked Docker containers with Maven installed). > > It seems the Beam doc is a bit confusing as to how to submit jobs. > > In https://beam.apache.org/get-started/quickstart-java/ > <https://beam.apache.org/get-started/quickstart-java/>, it mentions I should > use > > mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ > -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> > --filesToStage=target/word-count-beam-bundled-0.1.jar \ > --inputFile=/path/to/quickstart/pom.xml > --output=/tmp/counts" -Pflink-runner > where <flink master> seems just a hostname. > > In https://beam.apache.org/documentation/runners/flink/ > <https://beam.apache.org/documentation/runners/flink/>, it says I should use > > $ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ > -Pflink-runner \ > -Dexec.args="--runner=FlinkRunner \ > --inputFile=/path/to/pom.xml \ > --output=/path/to/counts \ > --flinkMaster=<flink master url> \ > --filesToStage=target/word-count-beam--bundled-0.1.jar" > where I can give localhost:8081 for flinkMaster. > > I have tried run this command both outside Docker and in the jobmanager > container (with exec command). Either way, if I use "localhost" without port > for <flink master url>, it just runs locally and ignores flink cluster. If I > use "localhost:8081", the command hangs for about 5 seconds and shows the > following error messages. It eventually disconnects and dies. > > Could you help give some hint how the Beam jobs are submitted to Flink > cluster in general? Can I do it outside jobmanager node remotely? > > Thanks in advance! > > -------------------------------------- > Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run > INFO: Executing pipeline using FlinkRunner. > Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run > INFO: Translating pipeline to Flink program. > Jul 14, 2018 12:49:48 AM > org.apache.beam.runners.flink.FlinkExecutionEnvironments > createBatchExecutionEnvironment > INFO: Creating a Batch Execution Environment. > Jul 14, 2018 12:49:48 AM > org.apache.beam.runners.flink.FlinkBatchPipelineTranslator > enterCompositeTransform > INFO: enterCompositeTransform- > Jul 14, 2018 12:49:48 AM > org.apache.beam.runners.flink.FlinkBatchPipelineTranslator > enterCompositeTransform > INFO: | enterCompositeTransform- ReadLines > Jul 14, 2018 12:49:48 AM > org.apache.beam.runners.flink.FlinkBatchPipelineTranslator > visitPrimitiveTransform > INFO: | | visitPrimitiveTransform- ReadLines/Read > Jul 14, 2018 12:49:48 AM > org.apache.beam.runners.flink.FlinkBatchPipelineTranslator > leaveCompositeTransform > INFO: | leaveCompositeTransform- ReadLines > Jul 14, 2018 12:49:48 AM > org.apache.beam.runners.flink.FlinkBatchPipelineTranslator > enterCompositeTransform > INFO: | enterCompositeTransform- WordCount.CountWords > ... > INFO: | | leaveCompositeTransform- WriteCounts/WriteFiles > Jul 14, 2018 12:49:48 AM > org.apache.beam.runners.flink.FlinkBatchPipelineTranslator > leaveCompositeTransform > INFO: | leaveCompositeTransform- WriteCounts > Jul 14, 2018 12:49:48 AM > org.apache.beam.runners.flink.FlinkBatchPipelineTranslator > leaveCompositeTransform > INFO: leaveCompositeTransform- > Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkRunner run > INFO: Starting execution of Flink program. > Jul 14, 2018 12:49:49 AM org.apache.flink.api.java.ExecutionEnvironment > createProgramPlan > INFO: The job has 0 registered types and 0 default Kryo serializers > Jul 14, 2018 12:49:49 AM org.apache.beam.sdk.io.FileBasedSource > getEstimatedSizeBytes > INFO: Filepattern pom.xml matched 1 files with total size 10600 > Jul 14, 2018 12:49:49 AM > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get > INFO: Starting client actor system. > Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils > findConnectingAddress > INFO: Trying to select the network interface and address to use by connecting > to the leading JobManager. > Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils > findConnectingAddress > INFO: TaskManager will try to connect for 10000 milliseconds before falling > back to heuristics > Jul 14, 2018 12:49:49 AM > org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener > findConnectingAddress > INFO: Retrieved new target address localhost/127.0.0.1:8081 > <http://127.0.0.1:8081/>. > Jul 14, 2018 12:49:49 AM > org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem > INFO: Trying to start actor system at c4342d15c3f4:0 > Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 > applyOrElse > INFO: Slf4jLogger started > Jul 14, 2018 12:49:50 AM > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 > apply$mcV$sp > INFO: Starting remoting > Jul 14, 2018 12:49:51 AM > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 > apply$mcV$sp > INFO: Remoting started; listening on addresses > :[akka.tcp://flink@c4342d15c3f4:46627] > Jul 14, 2018 12:49:51 AM > org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem > INFO: Actor system started at akka.tcp://flink@c4342d15c3f4:46627 > Jul 14, 2018 12:49:51 AM org.apache.flink.client.program.ClusterClient > logAndSysout > INFO: Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting > for job completion. > Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for job > completion. > Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor > handleMessage > INFO: Received SubmitJobAndWait(JobGraph(jobId: > 87a12b5471d39a7837d6b0def6d748e2)) but there is no connection to a JobManager > yet. > Jul 14, 2018 12:49:51 AM > org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage > INFO: Received job wordcount-root-0714004948-3ee44b3d > (87a12b5471d39a7837d6b0def6d748e2). > Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor > disconnectFromJobManager > INFO: Disconnect from JobManager null. > Jul 14, 2018 12:49:51 AM > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 > apply$mcV$sp > WARNING: Remote connection to [localhost/127.0.0.1:8081 > <http://127.0.0.1:8081/>] failed with > org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: > Adjusted frame length exceeds 10485760: 1213486164 - discarded > Jul 14, 2018 12:49:51 AM > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 > apply$mcV$sp > WARNING: Association with remote system [akka.tcp://flink@localhost:8081] has > failed, address is now gated for [5000] ms. Reason: [Association failed with > [akka.tcp://flink@localhost:8081]] Caused by: [The remote system explicitly > disassociated (reason unknown).] > ... > Jul 14, 2018 12:50:51 AM > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 > apply$mcV$sp > INFO: Remoting shut down. > Jul 14, 2018 12:50:51 AM org.apache.beam.runners.flink.FlinkRunner run > SEVERE: Pipeline execution failed > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Couldn't retrieve the JobExecutionResult from the > JobManager. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419) > at > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208) > at > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185) > ...
