Hey!

I'm having problems with a program that I used to be able to run just fine with flink, but now I'm getting a null pointer exception.

The beam program in question looks like this:

package no.spaeren.thesis.benchmarks.beam;

import no.spaeren.thesis.benchmarks.beam.helpers.CountSource;
import no.spaeren.thesis.benchmarks.beam.helpers.Printer;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import picocli.CommandLine;

import java.util.concurrent.Callable;

@CommandLine.Command(name = "BeamSimple", mixinStandardHelpOptions = true, description = 
"A simple beam job")
public class BeamSimple implements Callable<Void> {

    @CommandLine.Option(names = {"--from"}, defaultValue = "0")
    final Long from = 0L;

    @CommandLine.Option(names = {"--to"}, defaultValue = "1000")
    final Long to = 1000L;


    @Override
    public Void call() {
        FlinkPipelineOptions options = 
PipelineOptionsFactory.create().as(FlinkPipelineOptions.class);
        options.setDisableMetrics(true);
        options.setRunner(FlinkRunner.class);
        // options.setShutdownSourcesAfterIdleMs(100L);
        // options.setParallelism(2);
        Pipeline p = Pipeline.create(options);


        //final PCollection<Long> ds = 
p.apply(GenerateSequence.from(this.from).to(this.to));
        final PCollection<Long> ds = p.apply(Read.from(new 
CountSource(this.from, this.to)));
        final PCollection<Long> db = 
ds.apply(MapElements.into(TypeDescriptors.longs()).via((Long x) -> x * 2));

        db.apply(ParDo.of(new Printer<>("BeamSimple: %d\n")));


        p.run().waitUntilFinish();

        return null;
    }
}

I'm using Beam version 2.27.0, but it also happens on 2.29.0. The Flink version is 1.12.1.

And the error message I'm getting is:


dragon % JAVA_HOME="/usr/lib/jvm/java-8-openjdk" 
~/madsci/thesis/world/tools/flink/bin/flink info -p 1 
~/madsci/thesis/codes/mezurilo/target/mezurilo-bundled-1.0-SNAPSHOT.jar BeamSimple
Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program plan 
could not be fetched - the program aborted pre-maturely.

Classpath: 
[file:/home/rhermes/madsci/thesis/codes/mezurilo/target/mezurilo-bundled-1.0-SNAPSHOT.jar]

System.out: (none)

System.err: java.lang.NullPointerException
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SingletonImmutableList.<init>(SingletonImmutableList.java:38)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.of(ImmutableList.java:94)
        at 
org.apache.beam.sdk.coders.NullableCoder.getCoderArguments(NullableCoder.java:102)
        at 
org.apache.beam.sdk.coders.StructuredCoder.getComponents(StructuredCoder.java:49)
        at 
org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)
        at java.util.AbstractList.hashCode(AbstractList.java:541)
        at 
org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)
        at java.util.AbstractList.hashCode(AbstractList.java:541)
        at 
org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Hashing.smearedHash(Hashing.java:54)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap.get(HashBiMap.java:254)
        at 
org.apache.beam.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:265)
        at 
org.apache.beam.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:231)
        at 
org.apache.beam.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:210)
        at 
org.apache.beam.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:176)
        at 
org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:239)
        at 
org.apache.beam.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:743)
        at 
org.apache.beam.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:758)
        at 
org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:274)
        at 
org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:289)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:587)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:239)
        at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:213)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:468)
        at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:267)
        at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:217)
        at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:118)
        at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:92)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
        at no.spaeren.thesis.benchmarks.beam.BeamSimple.call(BeamSimple.java:45)
        at no.spaeren.thesis.benchmarks.beam.BeamSimple.call(BeamSimple.java:18)
        at picocli.CommandLine.executeUserObject(CommandLine.java:1853)
        at picocli.CommandLine.access$1100(CommandLine.java:145)
        at 
picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2243)
        at picocli.CommandLine$RunLast.handle(CommandLine.java:2237)
        at picocli.CommandLine$RunLast.handle(CommandLine.java:2201)
        at 
picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2068)
        at picocli.CommandLine.execute(CommandLine.java:1978)
        at no.spaeren.thesis.App.main(App.java:24)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213)
        at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
        at org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:366)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1067)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
        at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)

        at 
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
        at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:184)
        at org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:366)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1067)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
        at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)

Any help with this would be greatly appriciated.

Best regards,
Teodor Spæren

Reply via email to