Some more context - the problem is that RenameFields outputs (in this case)
Java Row objects that are inconsistent with the actual schema. For example
if you have the following schema:

Row {
   field1: Row {
      field2: string
    }
}

And rename field1.field2 -> renamed, you'll get the following schema

Row {
  field1: Row {
     renamed: string
   }
}

However the Java object for the _nested_ row will return the old schema if
getSchema() is called on it. This is because we only update the schema on
the top-level row.

I think this explains why your test works in the direct runner. If the row
ever goes through an encode/decode path, it will come back correct. The
original incorrect Java objects are no longer around, and new (consistent)
objects are constructed from the raw data and the PCollection schema.
Dataflow tends to fuse ParDos together, so the following ParDo will see the
incorrect Row object. I bet the DirectRunner is encoding and decoding in
between, which fixes the object.

You can validate this theory by forcing a shuffle after RenameFields using
Reshufflle. It should fix the issue If it does, let me know and I'll work
on a fix to RenameFields.

On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote:

> Aha, yes this indeed another bug in the transform. The schema is set on
> the top-level Row but not on any nested rows.
>
> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <matthew.ouy...@gmail.com>
> wrote:
>
>> Thank you everyone for your input.  I believe it will be easiest to
>> respond to all feedback in a single message rather than messages per person.
>>
>>    - NeedsRunner - The tests are run eventually, so obviously all good
>>    on my end.  I was trying to run the smallest subset of test cases possible
>>    and didn't venture beyond `gradle test`.
>>    - Stack Trace - There wasn't any unfortunately because no exception
>>    thrown in the code.  The Beam Row was translated into a BQ TableRow and an
>>    insertion was attempted.  The error "message" was part of the response 
>> JSON
>>    that came back as a result of a request against the BQ API.
>>    - Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>>    field0_1.nestedStringField is what I am looking for.
>>    - Info Logging Findings (In Lieu of a Stack Trace)
>>       - The Beam Schema was as expected with all renames applied.
>>       - The example I provided was heavily stripped down in order to
>>       isolate the problem.  My work example which a bit impractical because 
>> it's
>>       part of some generic tooling has 4 levels of nesting and also produces 
>> the
>>       correct output too.
>>       - BigQueryUtils.toTableRow(Row) returns the expected TableRow in
>>       DirectRunner.  In DataflowRunner however, only the top-level renames 
>> were
>>       reflected in the TableRow and all renames in the nested fields weren't.
>>       - BigQueryUtils.toTableRow(Row) recurses on the Row values and
>>       uses the Row.schema to get the field names.  This makes sense to me, 
>> but if
>>       a value is actually a Row then its schema appears to be inconsistent 
>> with
>>       the top-level schema
>>    - My Current Workaround - I forked RenameFields and replaced the
>>    attachValues in expand method to be a "deep" rename.  This is obviously
>>    inefficient and I will not be submitting a PR for that.
>>    - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>>
>>
>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote:
>>
>>> This transform is the same across all runners. A few comments on the
>>> test:
>>>
>>>   - Using attachValues directly is error prone (per the comment on the
>>> method). I recommend using the withFieldValue builders instead.
>>>   - I recommend capturing the RenameFields PCollection into a local
>>> variable of type PCollection<Row> and printing out the schema (which you
>>> can get using the PCollection.getSchema method) to ensure that the output
>>> schema looks like you expect.
>>>    - RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
>>> nestedStringField results in field0_1.nestedStringField; if you wanted to
>>> flatten, then the better transform would be
>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>>
>>> This all being said, eyeballing the implementation of RenameFields makes
>>> me think that it is buggy in the case where you specify a top-level field
>>> multiple times like you do. I think it is simply adding the top-level field
>>> into the output schema multiple times, and the second time is with the
>>> field0_1 base name; I have no idea why your test doesn't catch this in the
>>> DirectRunner, as it's equally broken there. Could you file a JIRA about
>>> this issue and assign it to me?
>>>
>>> Reuven
>>>
>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com>
>>>> wrote:
>>>>
>>>>> Hi Matthew,
>>>>>
>>>>> > The unit tests also seem to be disabled for this as well and so I
>>>>> don’t know if the PTransform behaves as expected.
>>>>>
>>>>> The exclusion for NeedsRunner tests is just a quirk in our testing
>>>>> framework. NeedsRunner indicates that a test suite can't be executed with
>>>>> the SDK alone, it needs a runner. So that exclusion just makes sure we
>>>>> don't run the test when we're verifying the SDK by itself in the
>>>>> :sdks:java:core:test task. The test is still run in other tasks where we
>>>>> have a runner, most notably in the Java PreCommit [1], where we run it as
>>>>> part of the :runners:direct-java:test task.
>>>>>
>>>>> That being said, we may only run these tests continuously with the
>>>>> DirectRunner, I'm not sure if we test them on all the runners like we do
>>>>> with ValidatesRunner tests.
>>>>>
>>>>
>>>> That is correct. The tests are tests _of the transform_ so they run
>>>> only on the DirectRunner. They are not tests of the runner, which is only
>>>> responsible for correctly implementing Beam's primitives. The transform
>>>> should not behave differently on different runners, except for fundamental
>>>> differences in how they schedule work and checkpoint.
>>>>
>>>> Kenn
>>>>
>>>>
>>>>> > The error message I’m receiving, : Error while reading data, error
>>>>> message: JSON parsing error in row starting at position 0: No such field:
>>>>> nestedField.field1_0, suggests the BigQuery is trying to use the
>>>>> original name for the nested field and not the substitute name.
>>>>>
>>>>> Is there a stacktrace associated with this error? It would be helpful
>>>>> to see where the error is coming from.
>>>>>
>>>>> Brian
>>>>>
>>>>>
>>>>> [1]
>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>>>>>
>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <
>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>
>>>>>> I’m trying to use the RenameFields transform prior to inserting into
>>>>>> BigQuery on nested fields.  Insertion into BigQuery is successful with
>>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested fields
>>>>>>  The error message I’m receiving, : Error while reading data, error
>>>>>> message: JSON parsing error in row starting at position 0: No such field:
>>>>>> nestedField.field1_0, suggests the BigQuery is trying to use the
>>>>>> original name for the nested field and not the substitute name.
>>>>>>
>>>>>> The code for RenameFields seems simple enough but does it behave
>>>>>> differently in different runners?  Will a deep attachValues be necessary 
>>>>>> in
>>>>>> order get the nested renames to work across all runners? Is there 
>>>>>> something
>>>>>> wrong in my code?
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>>>>>
>>>>>> The unit tests also seem to be disabled for this as well and so I
>>>>>> don’t know if the PTransform behaves as expected.
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>>>>>
>>>>>> package ca.loblaw.cerebro.PipelineControl;
>>>>>>>
>>>>>>> import com.google.api.services.bigquery.model.TableReference;
>>>>>>> import
>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>> import org.apache.beam.sdk.schemas.Schema;
>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>>>>>>> import org.apache.beam.sdk.transforms.Create;
>>>>>>> import org.apache.beam.sdk.values.Row;
>>>>>>>
>>>>>>> import java.io.File;
>>>>>>> import java.util.Arrays;
>>>>>>> import java.util.HashSet;
>>>>>>> import java.util.stream.Collectors;
>>>>>>>
>>>>>>> import static java.util.Arrays.*asList*;
>>>>>>>
>>>>>>> public class BQRenameFields {
>>>>>>>     public static void main(String[] args) {
>>>>>>>         PipelineOptionsFactory.*register*(DataflowPipelineOptions.
>>>>>>> class);
>>>>>>>         DataflowPipelineOptions options = PipelineOptionsFactory.
>>>>>>> *fromArgs*(args).as(DataflowPipelineOptions.class);
>>>>>>>         options.setFilesToStage(
>>>>>>>                 Arrays.*stream*(System.*getProperty*(
>>>>>>> "java.class.path").
>>>>>>>                         split(File.*pathSeparator*)).
>>>>>>>                         map(entry -> (new
>>>>>>> File(entry)).toString()).collect(Collectors.*toList*()));
>>>>>>>
>>>>>>>         Pipeline pipeline = Pipeline.*create*(options);
>>>>>>>
>>>>>>>         Schema nestedSchema = Schema.*builder*().addField(Schema.
>>>>>>> Field.*nullable*("field1_0", Schema.FieldType.*STRING*)).build();
>>>>>>>         Schema.Field field = Schema.Field.*nullable*("field0_0",
>>>>>>> Schema.FieldType.*STRING*);
>>>>>>>         Schema.Field nested = Schema.Field.*nullable*("field0_1",
>>>>>>> Schema.FieldType.*row*(nestedSchema));
>>>>>>>         Schema.Field runner = Schema.Field.*nullable*("field0_2",
>>>>>>> Schema.FieldType.*STRING*);
>>>>>>>         Schema rowSchema = Schema.*builder*()
>>>>>>>                 .addFields(field, nested, runner)
>>>>>>>                 .build();
>>>>>>>         Row testRow = Row.*withSchema*(rowSchema).attachValues(
>>>>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues("value1_0"
>>>>>>> ), options.getRunner().toString());
>>>>>>>         pipeline
>>>>>>>                 .apply(Create.*of*(testRow).withRowSchema(rowSchema
>>>>>>> ))
>>>>>>>                 .apply(RenameFields.<Row>*create*()
>>>>>>>                         .rename("field0_0", "stringField")
>>>>>>>                         .rename("field0_1", "nestedField")
>>>>>>>                         .rename("field0_1.field1_0",
>>>>>>> "nestedStringField")
>>>>>>>                         .rename("field0_2", "runner"))
>>>>>>>                 .apply(BigQueryIO.<Row>*write*()
>>>>>>>                         .to(new TableReference().setProjectId(
>>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId(
>>>>>>> "matto_renameFields"))
>>>>>>>                         .withCreateDisposition(BigQueryIO.Write.
>>>>>>> CreateDisposition.*CREATE_IF_NEEDED*)
>>>>>>>                         .withWriteDisposition(BigQueryIO.Write.
>>>>>>> WriteDisposition.*WRITE_APPEND*)
>>>>>>>                         .withSchemaUpdateOptions(new HashSet<>(
>>>>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption.*ALLOW_FIELD_ADDITION*
>>>>>>> )))
>>>>>>>                         .useBeamSchema());
>>>>>>>         pipeline.run();
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>

Reply via email to