Hello

I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to v2.37.0
(Java 8 & Maven), but have run into an issue with a breaking change that
I'd appreciate some support with.  Sorry this is quite a long one, I wanted
to capture as much context as I could, but please shout if there's anything
you'd like to dig into.

I'd note that I've also raised this on StackOverflow, if you find it easier
to read the Markdown there - https://stackoverflow.com/q/71875593/18805546.

I'm using Beam inside GCP Dataflow to read data from BigQuery, then
processing aggregates before I write the results back to BigQuery.  I'm
able to read from/write to BigQuery without issue, but after the upgrade my
pipeline to calculate aggregates is failing at runtime, specifically a
`DoFn` I have written to sanitise the results returned from the Beam
`SqlTransform.query` command.  I call this function within `ParDo.of` to
detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling MIN/MAX
aggregates in Beam SQL returns the Double min/max values when it encounters
a `NULL` value, rather than just returning NULL.  I did try filtering the
initial BigQuery raw data results, but this issue creeps in at the Beam SQL
level.

There may be better ways to do this (I'm open to suggestions!).  I've
included a bunch of code snippets from my pipeline that I've tried to
simplify, so apologies if there's anything obviously janky.  Here's what I
previously had before the library upgrade:

    PCollection<Row> aggregates = inputCollection.apply(
        "Generate Aggregates",

SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
    )
    .apply(ParDo.of(new HandleNullValues()));

I've included the `HandleNullValues` definition at the bottom of this
email, but it appears v2.21.0 introduced a breaking change whereby the
coder inference was disabled for Beam Row types in [this ticket](
https://issues.apache.org/jira/browse/BEAM-9569).  This change has caused
the above code to fail with the following runtime error:

> [ERROR] Failed to execute goal
> org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
> project dataflow-example: An exception occured while executing the
> Java class. Unable to return a default Coder for
> ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output
> [PCollection@83398426]. Correct one of the following root causes:
> [ERROR]   No Coder has been manually specified;  you may do so using
> .setCoder(). [ERROR]   Inferring a Coder from the CoderRegistry
> failed: Cannot provide a coder for a Beam Row. Please provide a schema
> instead using PCollection.setRowSchema. [ERROR]   Using the default
> output Coder from the producing PTransform failed:
> PTransform.getOutputCoder called.

I've followed the advice on the aforementioned JIRA ticket, plus a bunch of
other examples I found online, but without much joy.  I've tried applying
`setCoder(SerializableCoder.of(Row.class))` after the `.apply(ParDo.of(new
HandleNullValues()))` which fixes this error (though I'm not yet sure if
it's just suppressed the error, or if it's actually working), but that
changes causes another runtime error:

> [ERROR] Failed to execute goal
> org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
> project dataflow-example: An exception occured while executing the
> Java class. Cannot call getSchema when there is no schema -> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
> execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
> (default-cli) on project dataflow-example: An exception occured while
> executing the Java class. Cannot call getSchema when there is no
> schema

This error is thrown further down my pipeline, when I perform a subsequent
`SqlTransform.query` to JOIN some results together.

    PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
                    .and(new TupleTag<Row>("Experiments"), experiments)
                        .apply("Joining Aggregates to Experiments",
SqlTransform.query(aggregateExperimentJoin()))
                        .apply(ParDo.of(new
MapBeamRowsToBigQueryTableRows()))
                        .apply(BigQueryIO.writeTableRows()

.withCreateDisposition(CreateDisposition.CREATE_NEVER)

.withWriteDisposition(WriteDisposition.WRITE_APPEND)

.to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String,
String>) projectId -> projectId + ":daily_aggregates.experiments")));

I've verified the `aggregates` collection is indeed missing a schema if I
interrogate the `hasSchema` property.  The second `experiments` PCollection
above does have a row schema set though:

    PCollection<Row> rawExperiments = rows.apply(
        SqlTransform.query("select sessionId, experiments from PCOLLECTION")
    );
    PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new
CustomFunctions.ParseExperiments(bigQuerySchema)));
    experiments.setRowSchema(bigQuerySchema);

I've also tried applying this coder at the pipeline level, with different
variations on the following.  But this also gives the same error:

    CoderRegistry cr = pipeline.getCoderRegistry();
    cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema));
    cr.registerCoderForType(TypeDescriptors.rows(),
RowCoder.of(bigQuerySchema));

The `bigQuerySchema` object referenced above is the initial schema used to
retrieve all raw data from BigQuery, though that part of the pipeline works
fine, so potentially I need to pass the `aggregatesSchema` object (see
below) in to `registerCoderForType` for the pipeline?

I then tried to set the row schema on `aggregates` (which was another
suggestion in the error above).  I've confirmed that calling `setCoder` is
responsible for the previous `Row` schema disappearing, where it had
previously been set by the input PCollection (and also if I call
`setRowSchema` immediately before I call the `DoFn`.

I've simplified the schema for succinctness in this post, but it's a subset
of `bigQuerySchema` with a few new fields (simple data types).  Here's what
I've tried, again with various combinations of where I call `setCoder` and
`setRowSchema` (before `apply()` and/or after).

    Schema aggregatesSchema = Schema.builder()
        .addNullableField("userId", FieldType.STRING)
        .addNullableField("sessionId", FieldType.STRING)
        .addNullableField("experimentsPerDay", FieldType.INT64)
        .build();

    PCollection<Row> aggregates = inputCollection.apply(
        "Generate Aggregates",

SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
    )
    .apply(ParDo.of(new HandleNullValues()))
    .setCoder(SerializableCoder.of(Row.class))
    .setRowSchema(aggregatesSchema);

Unfortunately, this causes a third runtime error which I've not been able
to figure out:

> [ERROR] Failed to execute goal
> org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
> project dataflow-example: An exception occured while executing the
> Java class. java.lang.IllegalStateException -> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
> execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
> (default-cli) on project dataflow-example: An exception occured while
> executing the Java class. java.lang.IllegalStateException

The full call stack is at the bottom of this email, and I can see it
originating from my `HandleNullValues` `DoFn`, but after that it disappears
into the Beam libraries.

I'm at a loss as to which route is recommended, and how to proceed, as both
coder and schema options are causing different issues.

Any help would be greatly appreciated, and thanks for your efforts on this
project!


The full `DoFn` I've referred to is further below, but it's worth noting
that just having an essentially empty `DoFn` with both input and output of
Beam `Row` types causes the same issue:

    public static class HandleNullValues extends DoFn<Row, Row> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            Row row = c.element();
            c.output(row);
        }
    }

Here's the full implementation, if anyone can think of a better way to
detect and replace `NULL` values returned from Beam SQL:

    public static class HandleNullValues extends DoFn<Row, Row> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            Row row = c.element();
            List<String> fields = row.getSchema().getFieldNames();
            Builder rowBuilder = Row.withSchema(row.getSchema());

            for (String f: fields) {
                Object value = row.getValue(f);
                if (value != null && value instanceof Long) {
                    Long longVal = row.getInt64(f);
                    if (longVal == Long.MAX_VALUE || longVal ==
Long.MIN_VALUE) {
                        rowBuilder.addValue(null);
                    } else {
                        rowBuilder.addValue(value);
                    }
                } else if (value != null && value instanceof Double) {
                    Double doubleVal = row.getDouble(f);
                    if (doubleVal == Double.MAX_VALUE || doubleVal ==
Double.MIN_VALUE) {
                        rowBuilder.addValue(null);
                    } else {
                        rowBuilder.addValue(value);
                    }
                } else {
                    rowBuilder.addValue(value);
                }
            }

            Row newRow = rowBuilder.build();
            c.output(newRow);
        }
    }

And here's the full callstack from the `setRowSchema` issue detailed above:


> [ERROR] Failed to execute goal
> org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
> project dataflow-example: An exception occured while executing the
> Java class. java.lang.IllegalStateException -> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
> execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
> (default-cli) on project dataflow-example: An exception occured while
> executing the Java class. java.lang.IllegalStateException
>     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
(MojoExecutor.java:306)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:211)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:165)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:157)
>     at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
> (LifecycleModuleBuilder.java:121)
>     at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
> (LifecycleModuleBuilder.java:81)
>     at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
> (SingleThreadedBuilder.java:56)
>     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
(LifecycleStarter.java:127)
>     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)
>     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
>     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
> (Launcher.java:282)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
(Launcher.java:225)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
> (Launcher.java:406)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.main
(Launcher.java:347) Caused by:
> org.apache.maven.plugin.MojoExecutionException: An exception occured
> while executing the Java class. java.lang.IllegalStateException
>     at org.codehaus.mojo.exec.ExecJavaMojo.execute (ExecJavaMojo.java:311)
>     at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
(DefaultBuildPluginManager.java:137)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
(MojoExecutor.java:301)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:211)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:165)
>     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:157)
>     at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
> (LifecycleModuleBuilder.java:121)
>     at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
> (LifecycleModuleBuilder.java:81)
>     at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
> (SingleThreadedBuilder.java:56)
>     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
(LifecycleStarter.java:127)
>     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)
>     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
>     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
> (Launcher.java:282)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
(Launcher.java:225)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
> (Launcher.java:406)
>     at org.codehaus.plexus.classworlds.launcher.Launcher.main
(Launcher.java:347) Caused by:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalStateException
>     at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
> (DirectRunner.java:373)
>     at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
> (DirectRunner.java:341)
>     at org.apache.beam.runners.direct.DirectRunner.run
(DirectRunner.java:218)
>     at org.apache.beam.runners.direct.DirectRunner.run
(DirectRunner.java:67)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
>     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
>     at com.example.dataflow.Pipeline.main (Pipeline.java:284)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
>     at java.lang.Thread.run (Thread.java:748) Caused by:
java.lang.IllegalStateException
>     at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
> (Preconditions.java:491)
>     at
org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate
> (RowCoderGenerator.java:314)
>     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
(Unknown Source)
>     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
(Unknown Source)
>     at org.apache.beam.sdk.schemas.SchemaCoder.encode
(SchemaCoder.java:124)
>     at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)
>     at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
(CoderUtils.java:85)
>     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
(CoderUtils.java:69)
>     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
(CoderUtils.java:54)
>     at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)
>     at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>
> (MutationDetectors.java:118)
>     at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
(MutationDetectors.java:49)
>     at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
> (ImmutabilityCheckingBundleFactory.java:115)
>     at
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output
> (ParDoEvaluator.java:305)
>     at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue
> (SimpleDoFnRunner.java:268)
>     at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900
> (SimpleDoFnRunner.java:84)
>     at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
> (SimpleDoFnRunner.java:416)
>     at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
> (SimpleDoFnRunner.java:404)
>     at com.example.dataflow.Pipeline$HandleNullValues.processElement
(CustomFunctions.java:310)


Cheers!

Reply via email to