I don't think this bug is schema specific - we created a Java object that
is inconsistent with its encoded form, which could happen to any transform.

This does seem to be a gap in DirectRunner testing though. It also makes it
hard to test using PAssert, as I believe that puts everything in a side
input, forcing an encoding/decoding.

On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com> wrote:

> +dev <d...@beam.apache.org>
>
> > I bet the DirectRunner is encoding and decoding in between, which fixes
> the object.
>
> Do we need better testing of schema-aware (and potentially other built-in)
> transforms in the face of fusion to root out issues like this?
>
> Brian
>
> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <matthew.ouy...@gmail.com>
> wrote:
>
>> I have some other work-related things I need to do this week, so I will
>> likely report back on this over the weekend.  Thank you for the
>> explanation.  It makes perfect sense now.
>>
>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Some more context - the problem is that RenameFields outputs (in this
>>> case) Java Row objects that are inconsistent with the actual schema.
>>> For example if you have the following schema:
>>>
>>> Row {
>>>    field1: Row {
>>>       field2: string
>>>     }
>>> }
>>>
>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>
>>> Row {
>>>   field1: Row {
>>>      renamed: string
>>>    }
>>> }
>>>
>>> However the Java object for the _nested_ row will return the old schema
>>> if getSchema() is called on it. This is because we only update the schema
>>> on the top-level row.
>>>
>>> I think this explains why your test works in the direct runner. If the
>>> row ever goes through an encode/decode path, it will come back correct. The
>>> original incorrect Java objects are no longer around, and new (consistent)
>>> objects are constructed from the raw data and the PCollection schema.
>>> Dataflow tends to fuse ParDos together, so the following ParDo will see the
>>> incorrect Row object. I bet the DirectRunner is encoding and decoding in
>>> between, which fixes the object.
>>>
>>> You can validate this theory by forcing a shuffle after RenameFields
>>> using Reshufflle. It should fix the issue If it does, let me know and I'll
>>> work on a fix to RenameFields.
>>>
>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Aha, yes this indeed another bug in the transform. The schema is set on
>>>> the top-level Row but not on any nested rows.
>>>>
>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <matthew.ouy...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thank you everyone for your input.  I believe it will be easiest to
>>>>> respond to all feedback in a single message rather than messages per 
>>>>> person.
>>>>>
>>>>>    - NeedsRunner - The tests are run eventually, so obviously all
>>>>>    good on my end.  I was trying to run the smallest subset of test cases
>>>>>    possible and didn't venture beyond `gradle test`.
>>>>>    - Stack Trace - There wasn't any unfortunately because no
>>>>>    exception thrown in the code.  The Beam Row was translated into a BQ
>>>>>    TableRow and an insertion was attempted.  The error "message" was part 
>>>>> of
>>>>>    the response JSON that came back as a result of a request against the 
>>>>> BQ
>>>>>    API.
>>>>>    - Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>>>>>    field0_1.nestedStringField is what I am looking for.
>>>>>    - Info Logging Findings (In Lieu of a Stack Trace)
>>>>>       - The Beam Schema was as expected with all renames applied.
>>>>>       - The example I provided was heavily stripped down in order to
>>>>>       isolate the problem.  My work example which a bit impractical 
>>>>> because it's
>>>>>       part of some generic tooling has 4 levels of nesting and also 
>>>>> produces the
>>>>>       correct output too.
>>>>>       - BigQueryUtils.toTableRow(Row) returns the expected TableRow
>>>>>       in DirectRunner.  In DataflowRunner however, only the top-level 
>>>>> renames
>>>>>       were reflected in the TableRow and all renames in the nested fields 
>>>>> weren't.
>>>>>       - BigQueryUtils.toTableRow(Row) recurses on the Row values and
>>>>>       uses the Row.schema to get the field names.  This makes sense to 
>>>>> me, but if
>>>>>       a value is actually a Row then its schema appears to be 
>>>>> inconsistent with
>>>>>       the top-level schema
>>>>>    - My Current Workaround - I forked RenameFields and replaced the
>>>>>    attachValues in expand method to be a "deep" rename.  This is obviously
>>>>>    inefficient and I will not be submitting a PR for that.
>>>>>    - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>>>>>
>>>>>
>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> This transform is the same across all runners. A few comments on the
>>>>>> test:
>>>>>>
>>>>>>   - Using attachValues directly is error prone (per the comment on
>>>>>> the method). I recommend using the withFieldValue builders instead.
>>>>>>   - I recommend capturing the RenameFields PCollection into a local
>>>>>> variable of type PCollection<Row> and printing out the schema (which you
>>>>>> can get using the PCollection.getSchema method) to ensure that the output
>>>>>> schema looks like you expect.
>>>>>>    - RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
>>>>>> nestedStringField results in field0_1.nestedStringField; if you wanted to
>>>>>> flatten, then the better transform would be
>>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>>>>>
>>>>>> This all being said, eyeballing the implementation of RenameFields
>>>>>> makes me think that it is buggy in the case where you specify a top-level
>>>>>> field multiple times like you do. I think it is simply adding the 
>>>>>> top-level
>>>>>> field into the output schema multiple times, and the second time is with
>>>>>> the field0_1 base name; I have no idea why your test doesn't catch this 
>>>>>> in
>>>>>> the DirectRunner, as it's equally broken there. Could you file a JIRA 
>>>>>> about
>>>>>> this issue and assign it to me?
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Matthew,
>>>>>>>>
>>>>>>>> > The unit tests also seem to be disabled for this as well and so I
>>>>>>>> don’t know if the PTransform behaves as expected.
>>>>>>>>
>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our testing
>>>>>>>> framework. NeedsRunner indicates that a test suite can't be executed 
>>>>>>>> with
>>>>>>>> the SDK alone, it needs a runner. So that exclusion just makes sure we
>>>>>>>> don't run the test when we're verifying the SDK by itself in the
>>>>>>>> :sdks:java:core:test task. The test is still run in other tasks where 
>>>>>>>> we
>>>>>>>> have a runner, most notably in the Java PreCommit [1], where we run it 
>>>>>>>> as
>>>>>>>> part of the :runners:direct-java:test task.
>>>>>>>>
>>>>>>>> That being said, we may only run these tests continuously with the
>>>>>>>> DirectRunner, I'm not sure if we test them on all the runners like we 
>>>>>>>> do
>>>>>>>> with ValidatesRunner tests.
>>>>>>>>
>>>>>>>
>>>>>>> That is correct. The tests are tests _of the transform_ so they run
>>>>>>> only on the DirectRunner. They are not tests of the runner, which is 
>>>>>>> only
>>>>>>> responsible for correctly implementing Beam's primitives. The transform
>>>>>>> should not behave differently on different runners, except for 
>>>>>>> fundamental
>>>>>>> differences in how they schedule work and checkpoint.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>>
>>>>>>>> > The error message I’m receiving, : Error while reading data,
>>>>>>>> error message: JSON parsing error in row starting at position 0: No 
>>>>>>>> such
>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to
>>>>>>>> use the original name for the nested field and not the substitute name.
>>>>>>>>
>>>>>>>> Is there a stacktrace associated with this error? It would be
>>>>>>>> helpful to see where the error is coming from.
>>>>>>>>
>>>>>>>> Brian
>>>>>>>>
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>>>>>>>>
>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <
>>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I’m trying to use the RenameFields transform prior to inserting
>>>>>>>>> into BigQuery on nested fields.  Insertion into BigQuery is 
>>>>>>>>> successful with
>>>>>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested 
>>>>>>>>> fields
>>>>>>>>>  The error message I’m receiving, : Error while reading data,
>>>>>>>>> error message: JSON parsing error in row starting at position 0: No 
>>>>>>>>> such
>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to
>>>>>>>>> use the original name for the nested field and not the substitute 
>>>>>>>>> name.
>>>>>>>>>
>>>>>>>>> The code for RenameFields seems simple enough but does it behave
>>>>>>>>> differently in different runners?  Will a deep attachValues be 
>>>>>>>>> necessary in
>>>>>>>>> order get the nested renames to work across all runners? Is there 
>>>>>>>>> something
>>>>>>>>> wrong in my code?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>>>>>>>>
>>>>>>>>> The unit tests also seem to be disabled for this as well and so I
>>>>>>>>> don’t know if the PTransform behaves as expected.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>>>>>>>>
>>>>>>>>> package ca.loblaw.cerebro.PipelineControl;
>>>>>>>>>>
>>>>>>>>>> import com.google.api.services.bigquery.model.TableReference;
>>>>>>>>>> import
>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
>>>>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema;
>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>>>>>>>>>> import org.apache.beam.sdk.transforms.Create;
>>>>>>>>>> import org.apache.beam.sdk.values.Row;
>>>>>>>>>>
>>>>>>>>>> import java.io.File;
>>>>>>>>>> import java.util.Arrays;
>>>>>>>>>> import java.util.HashSet;
>>>>>>>>>> import java.util.stream.Collectors;
>>>>>>>>>>
>>>>>>>>>> import static java.util.Arrays.*asList*;
>>>>>>>>>>
>>>>>>>>>> public class BQRenameFields {
>>>>>>>>>>     public static void main(String[] args) {
>>>>>>>>>>         PipelineOptionsFactory.*register*(DataflowPipelineOptions
>>>>>>>>>> .class);
>>>>>>>>>>         DataflowPipelineOptions options = PipelineOptionsFactory.
>>>>>>>>>> *fromArgs*(args).as(DataflowPipelineOptions.class);
>>>>>>>>>>         options.setFilesToStage(
>>>>>>>>>>                 Arrays.*stream*(System.*getProperty*(
>>>>>>>>>> "java.class.path").
>>>>>>>>>>                         split(File.*pathSeparator*)).
>>>>>>>>>>                         map(entry -> (new
>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*()));
>>>>>>>>>>
>>>>>>>>>>         Pipeline pipeline = Pipeline.*create*(options);
>>>>>>>>>>
>>>>>>>>>>         Schema nestedSchema = Schema.*builder*().addField(Schema.
>>>>>>>>>> Field.*nullable*("field1_0", Schema.FieldType.*STRING*)).build();
>>>>>>>>>>         Schema.Field field = Schema.Field.*nullable*("field0_0",
>>>>>>>>>> Schema.FieldType.*STRING*);
>>>>>>>>>>         Schema.Field nested = Schema.Field.*nullable*("field0_1"
>>>>>>>>>> , Schema.FieldType.*row*(nestedSchema));
>>>>>>>>>>         Schema.Field runner = Schema.Field.*nullable*("field0_2"
>>>>>>>>>> , Schema.FieldType.*STRING*);
>>>>>>>>>>         Schema rowSchema = Schema.*builder*()
>>>>>>>>>>                 .addFields(field, nested, runner)
>>>>>>>>>>                 .build();
>>>>>>>>>>         Row testRow = Row.*withSchema*(rowSchema).attachValues(
>>>>>>>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues(
>>>>>>>>>> "value1_0"), options.getRunner().toString());
>>>>>>>>>>         pipeline
>>>>>>>>>>                 .apply(Create.*of*(testRow).withRowSchema(
>>>>>>>>>> rowSchema))
>>>>>>>>>>                 .apply(RenameFields.<Row>*create*()
>>>>>>>>>>                         .rename("field0_0", "stringField")
>>>>>>>>>>                         .rename("field0_1", "nestedField")
>>>>>>>>>>                         .rename("field0_1.field1_0",
>>>>>>>>>> "nestedStringField")
>>>>>>>>>>                         .rename("field0_2", "runner"))
>>>>>>>>>>                 .apply(BigQueryIO.<Row>*write*()
>>>>>>>>>>                         .to(new TableReference().setProjectId(
>>>>>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId(
>>>>>>>>>> "matto_renameFields"))
>>>>>>>>>>                         .withCreateDisposition(BigQueryIO.Write.
>>>>>>>>>> CreateDisposition.*CREATE_IF_NEEDED*)
>>>>>>>>>>                         .withWriteDisposition(BigQueryIO.Write.
>>>>>>>>>> WriteDisposition.*WRITE_APPEND*)
>>>>>>>>>>                         .withSchemaUpdateOptions(new HashSet<>(
>>>>>>>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption.
>>>>>>>>>> *ALLOW_FIELD_ADDITION*)))
>>>>>>>>>>                         .useBeamSchema());
>>>>>>>>>>         pipeline.run();
>>>>>>>>>>     }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>

Reply via email to