Hi everyone,

I've made a Beam pipeline that makes use of a SideInput which in my case is a 
Map of key/values. I'm running Flink (1.7.1) on yarn (hadoop 2.6.0). I've found 
that if my map is small enough everything works fine but if I make it large 
enough (2-3MB) the pipeline fails with,

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.lang.RuntimeException: Pipeline execution failed
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:117)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
at 
com.cerner.pophealth.vx130.processing.console.SideInputRunner.main(SideInputRunner.java:34)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
.. 12 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed. (JobID: 235b5466595d04d19df4f242531b51f3)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:123)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:114)
.. 20 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
.. 25 more
Caused by: java.lang.RuntimeException: Could not retrieve next input split.
at 
org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
Requesting the next input split failed.
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at 
org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
.. 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
.. 4 more

I even managed to simplify my pipeline to a WordCount-like using a SideInput 
and see the same result. I've tried increasing the akka.ask.timeout but this 
doesn't seem to help though it does increase the amount of time before the 
pipeline fails. If I use the DirectRunner this works though it can take awhile 
(several minutes for a simple test run).

Turning on debug logging and looking at the flink logs the most interesting 
part I see is this,

DEBUG org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization  - 
Getting Broadcast Variable (2e9d9a71db773d6c2023b49ba803785b 
"org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.<init>:390#a32dc9f64f1df03a"
 (1)) - First access, materializing.

Followed by numerous heartbeat messages before it fails. This is definitely for 
fetching my side input and the class should print another message when it 
fetches all the data but never does. I attempted to further dig into this but 
all I find are interfaces and abstract classes for reading and serializing data.

Can anyone recommend what I can try from here?

Reply via email to