Hi Teodor,

can you share (maybe github link, if you have it in public repo) the implementation of CountSource and Printer? What changed in Beam 2.25.0 (if I recall correctly) is how Read transform is translated. It uses SDF now, so there might be something that was broken before, but the change of the translation revealed the problem. You can check if your problem disappears if you add use_deprecated_read into experiments of the PipelineOptions. See Highlights in [1].

 Jan

[1] https://beam.apache.org/blog/beam-2.25.0/

On 5/6/21 9:28 AM, Teodor Spæren wrote:
Hey!

I'm having problems with a program that I used to be able to run just fine with flink, but now I'm getting a null pointer exception.

The beam program in question looks like this:

package no.spaeren.thesis.benchmarks.beam;

import no.spaeren.thesis.benchmarks.beam.helpers.CountSource;
import no.spaeren.thesis.benchmarks.beam.helpers.Printer;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import picocli.CommandLine;

import java.util.concurrent.Callable;

@CommandLine.Command(name = "BeamSimple", mixinStandardHelpOptions = true, description = "A simple beam job")
public class BeamSimple implements Callable<Void> {

    @CommandLine.Option(names = {"--from"}, defaultValue = "0")
    final Long from = 0L;

    @CommandLine.Option(names = {"--to"}, defaultValue = "1000")
    final Long to = 1000L;


    @Override
    public Void call() {
        FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class);
        options.setDisableMetrics(true);
        options.setRunner(FlinkRunner.class);
        // options.setShutdownSourcesAfterIdleMs(100L);
        // options.setParallelism(2);
        Pipeline p = Pipeline.create(options);


        //final PCollection<Long> ds = p.apply(GenerateSequence.from(this.from).to(this.to));         final PCollection<Long> ds = p.apply(Read.from(new CountSource(this.from, this.to)));         final PCollection<Long> db = ds.apply(MapElements.into(TypeDescriptors.longs()).via((Long x) -> x * 2));

        db.apply(ParDo.of(new Printer<>("BeamSimple: %d\n")));


        p.run().waitUntilFinish();

        return null;
    }
}

I'm using Beam version 2.27.0, but it also happens on 2.29.0. The Flink version is 1.12.1.

And the error message I'm getting is:


dragon % JAVA_HOME="/usr/lib/jvm/java-8-openjdk" ~/madsci/thesis/world/tools/flink/bin/flink info -p 1 ~/madsci/thesis/codes/mezurilo/target/mezurilo-bundled-1.0-SNAPSHOT.jar BeamSimple Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program plan could not be fetched - the program aborted pre-maturely.

Classpath: [file:/home/rhermes/madsci/thesis/codes/mezurilo/target/mezurilo-bundled-1.0-SNAPSHOT.jar]

System.out: (none)

System.err: java.lang.NullPointerException
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877)     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SingletonImmutableList.<init>(SingletonImmutableList.java:38)     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.of(ImmutableList.java:94)     at org.apache.beam.sdk.coders.NullableCoder.getCoderArguments(NullableCoder.java:102)     at org.apache.beam.sdk.coders.StructuredCoder.getComponents(StructuredCoder.java:49)     at org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)
    at java.util.AbstractList.hashCode(AbstractList.java:541)
    at org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)
    at java.util.AbstractList.hashCode(AbstractList.java:541)
    at org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Hashing.smearedHash(Hashing.java:54)     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap.get(HashBiMap.java:254)     at org.apache.beam.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:265)     at org.apache.beam.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:231)     at org.apache.beam.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:210)     at org.apache.beam.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:176)     at org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:239)     at org.apache.beam.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:743)     at org.apache.beam.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:758)     at org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:274)     at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:289)     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:587)     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:239)     at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:213)     at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:468)
    at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:267)
    at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:217)
    at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:118)     at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:92)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
    at no.spaeren.thesis.benchmarks.beam.BeamSimple.call(BeamSimple.java:45)     at no.spaeren.thesis.benchmarks.beam.BeamSimple.call(BeamSimple.java:18)
    at picocli.CommandLine.executeUserObject(CommandLine.java:1853)
    at picocli.CommandLine.access$1100(CommandLine.java:145)
    at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2243)
    at picocli.CommandLine$RunLast.handle(CommandLine.java:2237)
    at picocli.CommandLine$RunLast.handle(CommandLine.java:2201)
    at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2068)
    at picocli.CommandLine.execute(CommandLine.java:1978)
    at no.spaeren.thesis.App.main(App.java:24)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213)     at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)     at org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:366)     at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1067)     at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)     at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)

    at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)     at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:184)     at org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:366)     at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1067)     at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)     at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)

Any help with this would be greatly appriciated.

Best regards,
Teodor Spæren

Reply via email to