Hey!
I'm having problems with a program that I used to be able to run just
fine with flink, but now I'm getting a null pointer exception.
The beam program in question looks like this:
package no.spaeren.thesis.benchmarks.beam;
import no.spaeren.thesis.benchmarks.beam.helpers.CountSource;
import no.spaeren.thesis.benchmarks.beam.helpers.Printer;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import picocli.CommandLine;
import java.util.concurrent.Callable;
@CommandLine.Command(name = "BeamSimple", mixinStandardHelpOptions = true, description =
"A simple beam job")
public class BeamSimple implements Callable<Void> {
@CommandLine.Option(names = {"--from"}, defaultValue = "0")
final Long from = 0L;
@CommandLine.Option(names = {"--to"}, defaultValue = "1000")
final Long to = 1000L;
@Override
public Void call() {
FlinkPipelineOptions options =
PipelineOptionsFactory.create().as(FlinkPipelineOptions.class);
options.setDisableMetrics(true);
options.setRunner(FlinkRunner.class);
// options.setShutdownSourcesAfterIdleMs(100L);
// options.setParallelism(2);
Pipeline p = Pipeline.create(options);
//final PCollection<Long> ds =
p.apply(GenerateSequence.from(this.from).to(this.to));
final PCollection<Long> ds = p.apply(Read.from(new
CountSource(this.from, this.to)));
final PCollection<Long> db =
ds.apply(MapElements.into(TypeDescriptors.longs()).via((Long x) -> x * 2));
db.apply(ParDo.of(new Printer<>("BeamSimple: %d\n")));
p.run().waitUntilFinish();
return null;
}
}
I'm using Beam version 2.27.0, but it also happens on 2.29.0. The Flink
version is 1.12.1.
And the error message I'm getting is:
dragon % JAVA_HOME="/usr/lib/jvm/java-8-openjdk"
~/madsci/thesis/world/tools/flink/bin/flink info -p 1
~/madsci/thesis/codes/mezurilo/target/mezurilo-bundled-1.0-SNAPSHOT.jar BeamSimple
Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The program plan
could not be fetched - the program aborted pre-maturely.
Classpath:
[file:/home/rhermes/madsci/thesis/codes/mezurilo/target/mezurilo-bundled-1.0-SNAPSHOT.jar]
System.out: (none)
System.err: java.lang.NullPointerException
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SingletonImmutableList.<init>(SingletonImmutableList.java:38)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.of(ImmutableList.java:94)
at
org.apache.beam.sdk.coders.NullableCoder.getCoderArguments(NullableCoder.java:102)
at
org.apache.beam.sdk.coders.StructuredCoder.getComponents(StructuredCoder.java:49)
at
org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)
at java.util.AbstractList.hashCode(AbstractList.java:541)
at
org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)
at java.util.AbstractList.hashCode(AbstractList.java:541)
at
org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Hashing.smearedHash(Hashing.java:54)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap.get(HashBiMap.java:254)
at
org.apache.beam.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:265)
at
org.apache.beam.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:231)
at
org.apache.beam.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:210)
at
org.apache.beam.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:176)
at
org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:239)
at
org.apache.beam.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:743)
at
org.apache.beam.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:758)
at
org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:274)
at
org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:289)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:587)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:239)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:468)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:267)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:217)
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:118)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:92)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
at no.spaeren.thesis.benchmarks.beam.BeamSimple.call(BeamSimple.java:45)
at no.spaeren.thesis.benchmarks.beam.BeamSimple.call(BeamSimple.java:18)
at picocli.CommandLine.executeUserObject(CommandLine.java:1853)
at picocli.CommandLine.access$1100(CommandLine.java:145)
at
picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2243)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2237)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2201)
at
picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2068)
at picocli.CommandLine.execute(CommandLine.java:1978)
at no.spaeren.thesis.App.main(App.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213)
at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
at org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:366)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1067)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
at
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:184)
at org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:366)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1067)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
Any help with this would be greatly appriciated.
Best regards,
Teodor Spæren