Hi Teodor,
can you share (maybe github link, if you have it in public repo) the
implementation of CountSource and Printer? What changed in Beam 2.25.0
(if I recall correctly) is how Read transform is translated. It uses SDF
now, so there might be something that was broken before, but the change
of the translation revealed the problem. You can check if your problem
disappears if you add use_deprecated_read into experiments of the
PipelineOptions. See Highlights in [1].
Jan
[1] https://beam.apache.org/blog/beam-2.25.0/
On 5/6/21 9:28 AM, Teodor Spæren wrote:
Hey!
I'm having problems with a program that I used to be able to run just
fine with flink, but now I'm getting a null pointer exception.
The beam program in question looks like this:
package no.spaeren.thesis.benchmarks.beam;
import no.spaeren.thesis.benchmarks.beam.helpers.CountSource;
import no.spaeren.thesis.benchmarks.beam.helpers.Printer;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import picocli.CommandLine;
import java.util.concurrent.Callable;
@CommandLine.Command(name = "BeamSimple", mixinStandardHelpOptions =
true, description = "A simple beam job")
public class BeamSimple implements Callable<Void> {
@CommandLine.Option(names = {"--from"}, defaultValue = "0")
final Long from = 0L;
@CommandLine.Option(names = {"--to"}, defaultValue = "1000")
final Long to = 1000L;
@Override
public Void call() {
FlinkPipelineOptions options =
PipelineOptionsFactory.create().as(FlinkPipelineOptions.class);
options.setDisableMetrics(true);
options.setRunner(FlinkRunner.class);
// options.setShutdownSourcesAfterIdleMs(100L);
// options.setParallelism(2);
Pipeline p = Pipeline.create(options);
//final PCollection<Long> ds =
p.apply(GenerateSequence.from(this.from).to(this.to));
final PCollection<Long> ds = p.apply(Read.from(new
CountSource(this.from, this.to)));
final PCollection<Long> db =
ds.apply(MapElements.into(TypeDescriptors.longs()).via((Long x) -> x
* 2));
db.apply(ParDo.of(new Printer<>("BeamSimple: %d\n")));
p.run().waitUntilFinish();
return null;
}
}
I'm using Beam version 2.27.0, but it also happens on 2.29.0. The
Flink version is 1.12.1.
And the error message I'm getting is:
dragon % JAVA_HOME="/usr/lib/jvm/java-8-openjdk"
~/madsci/thesis/world/tools/flink/bin/flink info -p 1
~/madsci/thesis/codes/mezurilo/target/mezurilo-bundled-1.0-SNAPSHOT.jar
BeamSimple
Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on
-Dswing.aatext=true
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The
program plan could not be fetched - the program aborted pre-maturely.
Classpath:
[file:/home/rhermes/madsci/thesis/codes/mezurilo/target/mezurilo-bundled-1.0-SNAPSHOT.jar]
System.out: (none)
System.err: java.lang.NullPointerException
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SingletonImmutableList.<init>(SingletonImmutableList.java:38)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.of(ImmutableList.java:94)
at
org.apache.beam.sdk.coders.NullableCoder.getCoderArguments(NullableCoder.java:102)
at
org.apache.beam.sdk.coders.StructuredCoder.getComponents(StructuredCoder.java:49)
at
org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)
at java.util.AbstractList.hashCode(AbstractList.java:541)
at
org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)
at java.util.AbstractList.hashCode(AbstractList.java:541)
at
org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Hashing.smearedHash(Hashing.java:54)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap.get(HashBiMap.java:254)
at
org.apache.beam.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:265)
at
org.apache.beam.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:231)
at
org.apache.beam.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:210)
at
org.apache.beam.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:176)
at
org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:239)
at
org.apache.beam.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:743)
at
org.apache.beam.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:758)
at
org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:274)
at
org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:289)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:587)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:239)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:213)
at
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:468)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:267)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:217)
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:118)
at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:92)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
at
no.spaeren.thesis.benchmarks.beam.BeamSimple.call(BeamSimple.java:45)
at
no.spaeren.thesis.benchmarks.beam.BeamSimple.call(BeamSimple.java:18)
at picocli.CommandLine.executeUserObject(CommandLine.java:1853)
at picocli.CommandLine.access$1100(CommandLine.java:145)
at
picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2243)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2237)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2201)
at
picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2068)
at picocli.CommandLine.execute(CommandLine.java:1978)
at no.spaeren.thesis.App.main(App.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213)
at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
at
org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:366)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1067)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
at
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:184)
at
org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:366)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1067)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
Any help with this would be greatly appriciated.
Best regards,
Teodor Spæren