Thanks Andrew, out of interest does your test pass if all of the input
values to the MIN/MAX aggregates are NULL?  If I have a BigQuery column
that contains entirely NULL values, and I then convert them from TableRow
to Row types, they're still showing correctly as NULL if I perform a simple
System.out.printLn:

rowsFromBigQuery.apply(ParDo.of(new
PrintParDo())).setCoder(SerializableCoder.of(Row.class));
"NULL"
"NULL"
"NULL"....

If I then apply the following Beam SQL on this PCollection:

select max(experimentValue) as experimentValue
from PCOLLECTION

Then the results come back as -9223372036854775808, or 9223372036854775807
if you use MIN().

Hopefully I'm doing something silly and it's an easy fix, let me know if
there's anything you'd like me to try.


Cheers


On Fri, 15 Apr 2022 at 21:20, Andrew Pilloud <apill...@google.com> wrote:

> Are you sure the min/max values are coming from SqlTransform? I wrote a
> quick test in Beam (using Integer, but all types have the same null
> wrapper) and the nulls were dropped.
>
> More detail: I added the following test case
> to BeamSqlDslAggregationNullableTest on the latest Beam. The input values
> for f_int1 are 1, null, 2, null, null, null, 3. The test passed, (the
> return value being 1) which indicates we are dropping nulls before
> aggregation. (I don't believe this is actually correct behavior, we should
> be returning null?)
>
>  @Test
>  public void testMin() {
>    String sql = "SELECT min(f_int1) FROM PCOLLECTION";
>
>  
> PAssert.that(boundedInput.apply(SqlTransform.query(sql))).satisfies(matchesScalar(1));
>    pipeline.run();
>  }
>
> On Fri, Apr 15, 2022 at 11:37 AM Jimmy Headdon <
> jimmy.head...@mojiworks.com> wrote:
>
>> Thanks for the swift response Brian, Andrew.  I've tried your suggestion
>> Brian, and sadly I get the same error as the lengthy call stack from the
>> end of my original post (IllegalStateException) - it appears the
>> PCollection might have been finalised my the DoFn, and therefore I cannot
>> setRowSchema against it?  In the fully implemented version I captured in my
>> original post you can see I call withSchema when creating the Row objects,
>> though interestingly the cutdown version I also posted gives the same
>> error, even though it's passing the input row to the output without
>> mutating it?
>>
>> Regarding the NULL values from Beam SQL aggregations, I've re-run my
>> pipeline with my NullValueHandler commented out, and unfortunately I can
>> still see min and max integers being written back to BigQuery.  Is there
>> anything you'd like me to test to get you some further feedback?
>>
>> Thanks again!
>>
>> On Fri, 15 Apr 2022 at 18:37, Andrew Pilloud <apill...@google.com> wrote:
>>
>>> Beam SQL's null aggregation behavior changed radically in 2.34.0.
>>> https://github.com/apache/beam/pull/15174
>>>
>>> I think we drop them now?
>>>
>>> Andrew
>>>
>>>
>>> On Fri, Apr 15, 2022 at 10:17 AM Brian Hulette <bhule...@google.com>
>>> wrote:
>>>
>>>> Hi Jimmy,
>>>>
>>>> Sorry about this, I wonder if this error message could be more helpful?
>>>> You're right that the issue is that the output PCollection produced by
>>>> HandleNullValues doesn't have a schema attached to it. Beam has no way of
>>>> inferring the output schema through the opaque DoFn. A quick solution might
>>>> be to just propagate the schema from the SQL output:
>>>>
>>>>     PCollection<Row> sqlOutput = inputCollection.apply(
>>>>         "Generate Aggregates",
>>>>
>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>     )
>>>>
>>>>     PCollection<Row> aggregates = sqlOutput.apply(ParDo.of(new
>>>> HandleNullValues())).setRowSchema(inputCollection.getSchema())
>>>>
>>>> @Reuven Lax <re...@google.com> may have some other ideas.
>>>>
>>>> Stepping back to the reason you need to add HandleNullValues: "I call
>>>> this function within `ParDo.of` to detect `Double.MAX_VALUE` and
>>>> `Double.MIN_VALUE` values, as calling MIN/MAX aggregates in Beam SQL
>>>> returns the Double min/max values when it encounters a `NULL` value, rather
>>>> than just returning NULL."
>>>> @Andrew Pilloud <apill...@google.com> is this intended? Do you know if
>>>> there's any way to modify this behavior?
>>>>
>>>> Brian
>>>>
>>>> On Fri, Apr 15, 2022 at 1:14 AM Jimmy Headdon <
>>>> jimmy.head...@mojiworks.com> wrote:
>>>>
>>>>> Hello
>>>>>
>>>>> I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to
>>>>> v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking 
>>>>> change
>>>>> that I'd appreciate some support with.  Sorry this is quite a long one, I
>>>>> wanted to capture as much context as I could, but please shout if there's
>>>>> anything you'd like to dig into.
>>>>>
>>>>> I'd note that I've also raised this on StackOverflow, if you find it
>>>>> easier to read the Markdown there -
>>>>> https://stackoverflow.com/q/71875593/18805546.
>>>>>
>>>>> I'm using Beam inside GCP Dataflow to read data from BigQuery, then
>>>>> processing aggregates before I write the results back to BigQuery.  I'm
>>>>> able to read from/write to BigQuery without issue, but after the upgrade 
>>>>> my
>>>>> pipeline to calculate aggregates is failing at runtime, specifically a
>>>>> `DoFn` I have written to sanitise the results returned from the Beam
>>>>> `SqlTransform.query` command.  I call this function within `ParDo.of` to
>>>>> detect `Double.MAX_VALUE` and `Double.MIN_VALUE` values, as calling 
>>>>> MIN/MAX
>>>>> aggregates in Beam SQL returns the Double min/max values when it 
>>>>> encounters
>>>>> a `NULL` value, rather than just returning NULL.  I did try filtering the
>>>>> initial BigQuery raw data results, but this issue creeps in at the Beam 
>>>>> SQL
>>>>> level.
>>>>>
>>>>> There may be better ways to do this (I'm open to suggestions!).  I've
>>>>> included a bunch of code snippets from my pipeline that I've tried to
>>>>> simplify, so apologies if there's anything obviously janky.  Here's what I
>>>>> previously had before the library upgrade:
>>>>>
>>>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>>>         "Generate Aggregates",
>>>>>
>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>     )
>>>>>     .apply(ParDo.of(new HandleNullValues()));
>>>>>
>>>>> I've included the `HandleNullValues` definition at the bottom of this
>>>>> email, but it appears v2.21.0 introduced a breaking change whereby the
>>>>> coder inference was disabled for Beam Row types in [this ticket](
>>>>> https://issues.apache.org/jira/browse/BEAM-9569).  This change has
>>>>> caused the above code to fail with the following runtime error:
>>>>>
>>>>> > [ERROR] Failed to execute goal
>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>> > project dataflow-example: An exception occured while executing the
>>>>> > Java class. Unable to return a default Coder for
>>>>> > ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output
>>>>> > [PCollection@83398426]. Correct one of the following root causes:
>>>>> > [ERROR]   No Coder has been manually specified;  you may do so using
>>>>> > .setCoder(). [ERROR]   Inferring a Coder from the CoderRegistry
>>>>> > failed: Cannot provide a coder for a Beam Row. Please provide a
>>>>> schema
>>>>> > instead using PCollection.setRowSchema. [ERROR]   Using the default
>>>>> > output Coder from the producing PTransform failed:
>>>>> > PTransform.getOutputCoder called.
>>>>>
>>>>> I've followed the advice on the aforementioned JIRA ticket, plus a
>>>>> bunch of other examples I found online, but without much joy.  I've tried
>>>>> applying `setCoder(SerializableCoder.of(Row.class))` after the
>>>>> `.apply(ParDo.of(new HandleNullValues()))` which fixes this error (though
>>>>> I'm not yet sure if it's just suppressed the error, or if it's actually
>>>>> working), but that changes causes another runtime error:
>>>>>
>>>>> > [ERROR] Failed to execute goal
>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>> > project dataflow-example: An exception occured while executing the
>>>>> > Java class. Cannot call getSchema when there is no schema -> [Help 1]
>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>> > (default-cli) on project dataflow-example: An exception occured while
>>>>> > executing the Java class. Cannot call getSchema when there is no
>>>>> > schema
>>>>>
>>>>> This error is thrown further down my pipeline, when I perform a
>>>>> subsequent `SqlTransform.query` to JOIN some results together.
>>>>>
>>>>>     PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
>>>>>                     .and(new TupleTag<Row>("Experiments"), experiments)
>>>>>                         .apply("Joining Aggregates to Experiments",
>>>>> SqlTransform.query(aggregateExperimentJoin()))
>>>>>                         .apply(ParDo.of(new
>>>>> MapBeamRowsToBigQueryTableRows()))
>>>>>                         .apply(BigQueryIO.writeTableRows()
>>>>>
>>>>> .withCreateDisposition(CreateDisposition.CREATE_NEVER)
>>>>>
>>>>> .withWriteDisposition(WriteDisposition.WRITE_APPEND)
>>>>>
>>>>> .to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String,
>>>>> String>) projectId -> projectId + ":daily_aggregates.experiments")));
>>>>>
>>>>> I've verified the `aggregates` collection is indeed missing a schema
>>>>> if I interrogate the `hasSchema` property.  The second `experiments`
>>>>> PCollection above does have a row schema set though:
>>>>>
>>>>>     PCollection<Row> rawExperiments = rows.apply(
>>>>>         SqlTransform.query("select sessionId, experiments from
>>>>> PCOLLECTION")
>>>>>     );
>>>>>     PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new
>>>>> CustomFunctions.ParseExperiments(bigQuerySchema)));
>>>>>     experiments.setRowSchema(bigQuerySchema);
>>>>>
>>>>> I've also tried applying this coder at the pipeline level, with
>>>>> different variations on the following.  But this also gives the same 
>>>>> error:
>>>>>
>>>>>     CoderRegistry cr = pipeline.getCoderRegistry();
>>>>>     cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema));
>>>>>     cr.registerCoderForType(TypeDescriptors.rows(),
>>>>> RowCoder.of(bigQuerySchema));
>>>>>
>>>>> The `bigQuerySchema` object referenced above is the initial schema
>>>>> used to retrieve all raw data from BigQuery, though that part of the
>>>>> pipeline works fine, so potentially I need to pass the `aggregatesSchema`
>>>>> object (see below) in to `registerCoderForType` for the pipeline?
>>>>>
>>>>> I then tried to set the row schema on `aggregates` (which was another
>>>>> suggestion in the error above).  I've confirmed that calling `setCoder` is
>>>>> responsible for the previous `Row` schema disappearing, where it had
>>>>> previously been set by the input PCollection (and also if I call
>>>>> `setRowSchema` immediately before I call the `DoFn`.
>>>>>
>>>>> I've simplified the schema for succinctness in this post, but it's a
>>>>> subset of `bigQuerySchema` with a few new fields (simple data types).
>>>>> Here's what I've tried, again with various combinations of where I call
>>>>> `setCoder` and `setRowSchema` (before `apply()` and/or after).
>>>>>
>>>>>     Schema aggregatesSchema = Schema.builder()
>>>>>         .addNullableField("userId", FieldType.STRING)
>>>>>         .addNullableField("sessionId", FieldType.STRING)
>>>>>         .addNullableField("experimentsPerDay", FieldType.INT64)
>>>>>         .build();
>>>>>
>>>>>     PCollection<Row> aggregates = inputCollection.apply(
>>>>>         "Generate Aggregates",
>>>>>
>>>>> SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
>>>>>     )
>>>>>     .apply(ParDo.of(new HandleNullValues()))
>>>>>     .setCoder(SerializableCoder.of(Row.class))
>>>>>     .setRowSchema(aggregatesSchema);
>>>>>
>>>>> Unfortunately, this causes a third runtime error which I've not been
>>>>> able to figure out:
>>>>>
>>>>> > [ERROR] Failed to execute goal
>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>> > project dataflow-example: An exception occured while executing the
>>>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>> > (default-cli) on project dataflow-example: An exception occured while
>>>>> > executing the Java class. java.lang.IllegalStateException
>>>>>
>>>>> The full call stack is at the bottom of this email, and I can see it
>>>>> originating from my `HandleNullValues` `DoFn`, but after that it 
>>>>> disappears
>>>>> into the Beam libraries.
>>>>>
>>>>> I'm at a loss as to which route is recommended, and how to proceed, as
>>>>> both coder and schema options are causing different issues.
>>>>>
>>>>> Any help would be greatly appreciated, and thanks for your efforts on
>>>>> this project!
>>>>>
>>>>>
>>>>> The full `DoFn` I've referred to is further below, but it's worth
>>>>> noting that just having an essentially empty `DoFn` with both input and
>>>>> output of Beam `Row` types causes the same issue:
>>>>>
>>>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>>>         @ProcessElement
>>>>>         public void processElement(ProcessContext c) {
>>>>>             Row row = c.element();
>>>>>             c.output(row);
>>>>>         }
>>>>>     }
>>>>>
>>>>> Here's the full implementation, if anyone can think of a better way to
>>>>> detect and replace `NULL` values returned from Beam SQL:
>>>>>
>>>>>     public static class HandleNullValues extends DoFn<Row, Row> {
>>>>>         @ProcessElement
>>>>>         public void processElement(ProcessContext c) {
>>>>>             Row row = c.element();
>>>>>             List<String> fields = row.getSchema().getFieldNames();
>>>>>             Builder rowBuilder = Row.withSchema(row.getSchema());
>>>>>
>>>>>             for (String f: fields) {
>>>>>                 Object value = row.getValue(f);
>>>>>                 if (value != null && value instanceof Long) {
>>>>>                     Long longVal = row.getInt64(f);
>>>>>                     if (longVal == Long.MAX_VALUE || longVal ==
>>>>> Long.MIN_VALUE) {
>>>>>                         rowBuilder.addValue(null);
>>>>>                     } else {
>>>>>                         rowBuilder.addValue(value);
>>>>>                     }
>>>>>                 } else if (value != null && value instanceof Double) {
>>>>>                     Double doubleVal = row.getDouble(f);
>>>>>                     if (doubleVal == Double.MAX_VALUE || doubleVal ==
>>>>> Double.MIN_VALUE) {
>>>>>                         rowBuilder.addValue(null);
>>>>>                     } else {
>>>>>                         rowBuilder.addValue(value);
>>>>>                     }
>>>>>                 } else {
>>>>>                     rowBuilder.addValue(value);
>>>>>                 }
>>>>>             }
>>>>>
>>>>>             Row newRow = rowBuilder.build();
>>>>>             c.output(newRow);
>>>>>         }
>>>>>     }
>>>>>
>>>>> And here's the full callstack from the `setRowSchema` issue detailed
>>>>> above:
>>>>>
>>>>>
>>>>> > [ERROR] Failed to execute goal
>>>>> > org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on
>>>>> > project dataflow-example: An exception occured while executing the
>>>>> > Java class. java.lang.IllegalStateException -> [Help 1]
>>>>> > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to
>>>>> > execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java
>>>>> > (default-cli) on project dataflow-example: An exception occured while
>>>>> > executing the Java class. java.lang.IllegalStateException
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>>>> (MojoExecutor.java:306)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>> (MojoExecutor.java:211)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>> (MojoExecutor.java:165)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>> (MojoExecutor.java:157)
>>>>> >     at
>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>> > (LifecycleModuleBuilder.java:121)
>>>>> >     at
>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>> > (LifecycleModuleBuilder.java:81)
>>>>> >     at
>>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>>>> > (SingleThreadedBuilder.java:56)
>>>>> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>>>> (LifecycleStarter.java:127)
>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>> (DefaultMaven.java:294)
>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>> (DefaultMaven.java:192)
>>>>> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>>>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>>>> (NativeMethodAccessorImpl.java:62)
>>>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>>>> (DelegatingMethodAccessorImpl.java:43)
>>>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>>>> >     at
>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>>>> > (Launcher.java:282)
>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>>>> (Launcher.java:225)
>>>>> >     at
>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>>>> > (Launcher.java:406)
>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>>>> (Launcher.java:347) Caused by:
>>>>> > org.apache.maven.plugin.MojoExecutionException: An exception occured
>>>>> > while executing the Java class. java.lang.IllegalStateException
>>>>> >     at org.codehaus.mojo.exec.ExecJavaMojo.execute
>>>>> (ExecJavaMojo.java:311)
>>>>> >     at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
>>>>> (DefaultBuildPluginManager.java:137)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute
>>>>> (MojoExecutor.java:301)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>> (MojoExecutor.java:211)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>> (MojoExecutor.java:165)
>>>>> >     at org.apache.maven.lifecycle.internal.MojoExecutor.execute
>>>>> (MojoExecutor.java:157)
>>>>> >     at
>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>> > (LifecycleModuleBuilder.java:121)
>>>>> >     at
>>>>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
>>>>> > (LifecycleModuleBuilder.java:81)
>>>>> >     at
>>>>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>>>>> > (SingleThreadedBuilder.java:56)
>>>>> >     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>>>> (LifecycleStarter.java:127)
>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>> (DefaultMaven.java:294)
>>>>> >     at org.apache.maven.DefaultMaven.doExecute
>>>>> (DefaultMaven.java:192)
>>>>> >     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
>>>>> >     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)
>>>>> >     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)
>>>>> >     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)
>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>>>>> >     at sun.reflect.NativeMethodAccessorImpl.invoke
>>>>> (NativeMethodAccessorImpl.java:62)
>>>>> >     at sun.reflect.DelegatingMethodAccessorImpl.invoke
>>>>> (DelegatingMethodAccessorImpl.java:43)
>>>>> >     at java.lang.reflect.Method.invoke (Method.java:498)
>>>>> >     at
>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
>>>>> > (Launcher.java:282)
>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.launch
>>>>> (Launcher.java:225)
>>>>> >     at
>>>>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
>>>>> > (Launcher.java:406)
>>>>> >     at org.codehaus.plexus.classworlds.launcher.Launcher.main
>>>>> (Launcher.java:347) Caused by:
>>>>> > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>> > java.lang.IllegalStateException
>>>>> >     at
>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>>>> > (DirectRunner.java:373)
>>>>> >     at
>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
>>>>> > (DirectRunner.java:341)
>>>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>>>> (DirectRunner.java:218)
>>>>> >     at org.apache.beam.runners.direct.DirectRunner.run
>>>>> (DirectRunner.java:67)
>>>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
>>>>> >     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
>>>>> >     at com.example.dataflow.Pipeline.main (Pipeline.java:284)
>>>>> >     at org.codehaus.mojo.exec.ExecJavaMojo$1.run
>>>>> (ExecJavaMojo.java:254)
>>>>> >     at java.lang.Thread.run (Thread.java:748) Caused by:
>>>>> java.lang.IllegalStateException
>>>>> >     at
>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>>> > (Preconditions.java:491)
>>>>> >     at
>>>>> org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate
>>>>> > (RowCoderGenerator.java:314)
>>>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>>>> (Unknown Source)
>>>>> >     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode
>>>>> (Unknown Source)
>>>>> >     at org.apache.beam.sdk.schemas.SchemaCoder.encode
>>>>> (SchemaCoder.java:124)
>>>>> >     at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)
>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
>>>>> (CoderUtils.java:85)
>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>>>> (CoderUtils.java:69)
>>>>> >     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
>>>>> (CoderUtils.java:54)
>>>>> >     at org.apache.beam.sdk.util.CoderUtils.clone
>>>>> (CoderUtils.java:144)
>>>>> >     at
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>
>>>>> > (MutationDetectors.java:118)
>>>>> >     at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
>>>>> (MutationDetectors.java:49)
>>>>> >     at
>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
>>>>> > (ImmutabilityCheckingBundleFactory.java:115)
>>>>> >     at
>>>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output
>>>>> > (ParDoEvaluator.java:305)
>>>>> >     at
>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue
>>>>> > (SimpleDoFnRunner.java:268)
>>>>> >     at
>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900
>>>>> > (SimpleDoFnRunner.java:84)
>>>>> >     at
>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>>>> > (SimpleDoFnRunner.java:416)
>>>>> >     at
>>>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output
>>>>> > (SimpleDoFnRunner.java:404)
>>>>> >     at com.example.dataflow.Pipeline$HandleNullValues.processElement
>>>>> (CustomFunctions.java:310)
>>>>>
>>>>>
>>>>> Cheers!
>>>>>
>>>>

Reply via email to