Hello,

I am a newbie to Beam.

Following the Beam docs, I am trying to submit the example WordCount to a
Flink cluster (one jobmanger and one taskmanager running locally in two
linked Docker containers with Maven installed).

It seems the Beam doc is a bit confusing as to how to submit jobs.

In https://beam.apache.org/get-started/quickstart-java/, it mentions I
should use

mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master>
--filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml
--output=/tmp/counts" -Pflink-runner

where <flink master> seems just a hostname.

In https://beam.apache.org/documentation/runners/flink/, it says I should
use

$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Pflink-runner \
    -Dexec.args="--runner=FlinkRunner \
      --inputFile=/path/to/pom.xml \
      --output=/path/to/counts \
      --flinkMaster=<flink master url> \
      --filesToStage=target/word-count-beam--bundled-0.1.jar"

where I can give localhost:8081 for flinkMaster.

I have tried run this command both outside Docker and in the jobmanager
container (with exec command). Either way, if I use "localhost" without
port for <flink master url>, it just runs locally and ignores flink
cluster. If I use "localhost:8081", the command hangs for about 5 seconds
and shows the following error messages. It eventually disconnects and dies.

Could you help give some hint how the Beam jobs are submitted to Flink
cluster in general? Can I do it outside jobmanager node remotely?

Thanks in advance!

--------------------------------------
Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
INFO: Executing pipeline using FlinkRunner.
Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
INFO: Translating pipeline to Flink program.
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkExecutionEnvironments
createBatchExecutionEnvironment
INFO: Creating a Batch Execution Environment.
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
enterCompositeTransform
INFO:  enterCompositeTransform-
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
enterCompositeTransform
INFO: |    enterCompositeTransform- ReadLines
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
visitPrimitiveTransform
INFO: |   |    visitPrimitiveTransform- ReadLines/Read
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
leaveCompositeTransform
INFO: |    leaveCompositeTransform- ReadLines
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
enterCompositeTransform
INFO: |    enterCompositeTransform- WordCount.CountWords
...
INFO: |   |    leaveCompositeTransform- WriteCounts/WriteFiles
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
leaveCompositeTransform
INFO: |    leaveCompositeTransform- WriteCounts
Jul 14, 2018 12:49:48 AM
org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
leaveCompositeTransform
INFO:  leaveCompositeTransform-
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkRunner run
INFO: Starting execution of Flink program.
Jul 14, 2018 12:49:49 AM org.apache.flink.api.java.ExecutionEnvironment
createProgramPlan
INFO: The job has 0 registered types and 0 default Kryo serializers
Jul 14, 2018 12:49:49 AM org.apache.beam.sdk.io.FileBasedSource
getEstimatedSizeBytes
INFO: Filepattern pom.xml matched 1 files with total size 10600
Jul 14, 2018 12:49:49 AM
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get
INFO: Starting client actor system.
Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils
findConnectingAddress
INFO: Trying to select the network interface and address to use by
connecting to the leading JobManager.
Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils
findConnectingAddress
INFO: TaskManager will try to connect for 10000 milliseconds before falling
back to heuristics
Jul 14, 2018 12:49:49 AM
org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener
findConnectingAddress
INFO: Retrieved new target address localhost/127.0.0.1:8081.
Jul 14, 2018 12:49:49 AM
org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
INFO: Trying to start actor system at c4342d15c3f4:0
Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
applyOrElse
INFO: Slf4jLogger started
Jul 14, 2018 12:49:50 AM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
apply$mcV$sp
INFO: Starting remoting
Jul 14, 2018 12:49:51 AM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
apply$mcV$sp
INFO: Remoting started; listening on addresses
:[akka.tcp://flink@c4342d15c3f4:46627]
Jul 14, 2018 12:49:51 AM
org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
INFO: Actor system started at akka.tcp://flink@c4342d15c3f4:46627
Jul 14, 2018 12:49:51 AM org.apache.flink.client.program.ClusterClient
logAndSysout
INFO: Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting
for job completion.
Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for
job completion.
Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor
handleMessage
INFO: Received SubmitJobAndWait(JobGraph(jobId:
87a12b5471d39a7837d6b0def6d748e2)) but there is no connection to a
JobManager yet.
Jul 14, 2018 12:49:51 AM
org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage
INFO: Received job wordcount-root-0714004948-3ee44b3d
(87a12b5471d39a7837d6b0def6d748e2).
Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor
disconnectFromJobManager
INFO: Disconnect from JobManager null.
Jul 14, 2018 12:49:51 AM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2
apply$mcV$sp
WARNING: Remote connection to [localhost/127.0.0.1:8081] failed with
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
Adjusted frame length exceeds 10485760: 1213486164 - discarded
Jul 14, 2018 12:49:51 AM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2
apply$mcV$sp
WARNING: Association with remote system [akka.tcp://flink@localhost:8081]
has failed, address is now gated for [5000] ms. Reason: [Association failed
with [akka.tcp://flink@localhost:8081]] Caused by: [The remote system
explicitly disassociated (reason unknown).]
...
Jul 14, 2018 12:50:51 AM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
apply$mcV$sp
INFO: Remoting shut down.
Jul 14, 2018 12:50:51 AM org.apache.beam.runners.flink.FlinkRunner run
SEVERE: Pipeline execution failed
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Couldn't retrieve the JobExecutionResult from the
JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
at
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
...

Reply via email to