There is no bug in the Coder itself, so that wouldn't catch it. We could
insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if
the Direct runner already does an encode/decode before that ParDo, then
that would have fixed the problem before we could see it.

On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles <k...@apache.org> wrote:

> Would it be caught by CoderProperties?
>
> Kenn
>
> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <re...@google.com> wrote:
>
>> I don't think this bug is schema specific - we created a Java object that
>> is inconsistent with its encoded form, which could happen to any transform.
>>
>> This does seem to be a gap in DirectRunner testing though. It also makes
>> it hard to test using PAssert, as I believe that puts everything in a side
>> input, forcing an encoding/decoding.
>>
>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com> wrote:
>>
>>> +dev <d...@beam.apache.org>
>>>
>>> > I bet the DirectRunner is encoding and decoding in between, which
>>> fixes the object.
>>>
>>> Do we need better testing of schema-aware (and potentially other
>>> built-in) transforms in the face of fusion to root out issues like this?
>>>
>>> Brian
>>>
>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <matthew.ouy...@gmail.com>
>>> wrote:
>>>
>>>> I have some other work-related things I need to do this week, so I will
>>>> likely report back on this over the weekend.  Thank you for the
>>>> explanation.  It makes perfect sense now.
>>>>
>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Some more context - the problem is that RenameFields outputs (in this
>>>>> case) Java Row objects that are inconsistent with the actual schema.
>>>>> For example if you have the following schema:
>>>>>
>>>>> Row {
>>>>>    field1: Row {
>>>>>       field2: string
>>>>>     }
>>>>> }
>>>>>
>>>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>>>
>>>>> Row {
>>>>>   field1: Row {
>>>>>      renamed: string
>>>>>    }
>>>>> }
>>>>>
>>>>> However the Java object for the _nested_ row will return the old
>>>>> schema if getSchema() is called on it. This is because we only update the
>>>>> schema on the top-level row.
>>>>>
>>>>> I think this explains why your test works in the direct runner. If the
>>>>> row ever goes through an encode/decode path, it will come back correct. 
>>>>> The
>>>>> original incorrect Java objects are no longer around, and new (consistent)
>>>>> objects are constructed from the raw data and the PCollection schema.
>>>>> Dataflow tends to fuse ParDos together, so the following ParDo will see 
>>>>> the
>>>>> incorrect Row object. I bet the DirectRunner is encoding and decoding in
>>>>> between, which fixes the object.
>>>>>
>>>>> You can validate this theory by forcing a shuffle after RenameFields
>>>>> using Reshufflle. It should fix the issue If it does, let me know and I'll
>>>>> work on a fix to RenameFields.
>>>>>
>>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Aha, yes this indeed another bug in the transform. The schema is set
>>>>>> on the top-level Row but not on any nested rows.
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>
>>>>>>> Thank you everyone for your input.  I believe it will be easiest to
>>>>>>> respond to all feedback in a single message rather than messages per 
>>>>>>> person.
>>>>>>>
>>>>>>>    - NeedsRunner - The tests are run eventually, so obviously all
>>>>>>>    good on my end.  I was trying to run the smallest subset of test 
>>>>>>> cases
>>>>>>>    possible and didn't venture beyond `gradle test`.
>>>>>>>    - Stack Trace - There wasn't any unfortunately because no
>>>>>>>    exception thrown in the code.  The Beam Row was translated into a BQ
>>>>>>>    TableRow and an insertion was attempted.  The error "message" was 
>>>>>>> part of
>>>>>>>    the response JSON that came back as a result of a request against 
>>>>>>> the BQ
>>>>>>>    API.
>>>>>>>    - Desired Behaviour - (field0_1.field1_0, nestedStringField) ->
>>>>>>>    field0_1.nestedStringField is what I am looking for.
>>>>>>>    - Info Logging Findings (In Lieu of a Stack Trace)
>>>>>>>       - The Beam Schema was as expected with all renames applied.
>>>>>>>       - The example I provided was heavily stripped down in order
>>>>>>>       to isolate the problem.  My work example which a bit impractical 
>>>>>>> because
>>>>>>>       it's part of some generic tooling has 4 levels of nesting and 
>>>>>>> also produces
>>>>>>>       the correct output too.
>>>>>>>       - BigQueryUtils.toTableRow(Row) returns the expected TableRow
>>>>>>>       in DirectRunner.  In DataflowRunner however, only the top-level 
>>>>>>> renames
>>>>>>>       were reflected in the TableRow and all renames in the nested 
>>>>>>> fields weren't.
>>>>>>>       - BigQueryUtils.toTableRow(Row) recurses on the Row values
>>>>>>>       and uses the Row.schema to get the field names.  This makes sense 
>>>>>>> to me,
>>>>>>>       but if a value is actually a Row then its schema appears to be 
>>>>>>> inconsistent
>>>>>>>       with the top-level schema
>>>>>>>    - My Current Workaround - I forked RenameFields and replaced the
>>>>>>>    attachValues in expand method to be a "deep" rename.  This is 
>>>>>>> obviously
>>>>>>>    inefficient and I will not be submitting a PR for that.
>>>>>>>    - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> This transform is the same across all runners. A few comments on
>>>>>>>> the test:
>>>>>>>>
>>>>>>>>   - Using attachValues directly is error prone (per the comment on
>>>>>>>> the method). I recommend using the withFieldValue builders instead.
>>>>>>>>   - I recommend capturing the RenameFields PCollection into a local
>>>>>>>> variable of type PCollection<Row> and printing out the schema (which 
>>>>>>>> you
>>>>>>>> can get using the PCollection.getSchema method) to ensure that the 
>>>>>>>> output
>>>>>>>> schema looks like you expect.
>>>>>>>>    - RenameFields doesn't flatten. So renaming field0_1.field1_0 -
>>>>>>>> > nestedStringField results in field0_1.nestedStringField; if you 
>>>>>>>> > wanted to
>>>>>>>> flatten, then the better transform would be
>>>>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>>>>>>>
>>>>>>>> This all being said, eyeballing the implementation of RenameFields
>>>>>>>> makes me think that it is buggy in the case where you specify a 
>>>>>>>> top-level
>>>>>>>> field multiple times like you do. I think it is simply adding the 
>>>>>>>> top-level
>>>>>>>> field into the output schema multiple times, and the second time is 
>>>>>>>> with
>>>>>>>> the field0_1 base name; I have no idea why your test doesn't catch 
>>>>>>>> this in
>>>>>>>> the DirectRunner, as it's equally broken there. Could you file a JIRA 
>>>>>>>> about
>>>>>>>> this issue and assign it to me?
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Matthew,
>>>>>>>>>>
>>>>>>>>>> > The unit tests also seem to be disabled for this as well and so
>>>>>>>>>> I don’t know if the PTransform behaves as expected.
>>>>>>>>>>
>>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our
>>>>>>>>>> testing framework. NeedsRunner indicates that a test suite can't be
>>>>>>>>>> executed with the SDK alone, it needs a runner. So that exclusion 
>>>>>>>>>> just
>>>>>>>>>> makes sure we don't run the test when we're verifying the SDK by 
>>>>>>>>>> itself in
>>>>>>>>>> the :sdks:java:core:test task. The test is still run in other tasks 
>>>>>>>>>> where
>>>>>>>>>> we have a runner, most notably in the Java PreCommit [1], where we 
>>>>>>>>>> run it
>>>>>>>>>> as part of the :runners:direct-java:test task.
>>>>>>>>>>
>>>>>>>>>> That being said, we may only run these tests continuously with
>>>>>>>>>> the DirectRunner, I'm not sure if we test them on all the runners 
>>>>>>>>>> like we
>>>>>>>>>> do with ValidatesRunner tests.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> That is correct. The tests are tests _of the transform_ so they
>>>>>>>>> run only on the DirectRunner. They are not tests of the runner, which 
>>>>>>>>> is
>>>>>>>>> only responsible for correctly implementing Beam's primitives. The
>>>>>>>>> transform should not behave differently on different runners, except 
>>>>>>>>> for
>>>>>>>>> fundamental differences in how they schedule work and checkpoint.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> > The error message I’m receiving, : Error while reading data,
>>>>>>>>>> error message: JSON parsing error in row starting at position 0: No 
>>>>>>>>>> such
>>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to
>>>>>>>>>> use the original name for the nested field and not the substitute 
>>>>>>>>>> name.
>>>>>>>>>>
>>>>>>>>>> Is there a stacktrace associated with this error? It would be
>>>>>>>>>> helpful to see where the error is coming from.
>>>>>>>>>>
>>>>>>>>>> Brian
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>>>>>>>>>>
>>>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <
>>>>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I’m trying to use the RenameFields transform prior to inserting
>>>>>>>>>>> into BigQuery on nested fields.  Insertion into BigQuery is 
>>>>>>>>>>> successful with
>>>>>>>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested 
>>>>>>>>>>> fields
>>>>>>>>>>>  The error message I’m receiving, : Error while reading data,
>>>>>>>>>>> error message: JSON parsing error in row starting at position 0: No 
>>>>>>>>>>> such
>>>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to
>>>>>>>>>>> use the original name for the nested field and not the substitute 
>>>>>>>>>>> name.
>>>>>>>>>>>
>>>>>>>>>>> The code for RenameFields seems simple enough but does it behave
>>>>>>>>>>> differently in different runners?  Will a deep attachValues be 
>>>>>>>>>>> necessary in
>>>>>>>>>>> order get the nested renames to work across all runners? Is there 
>>>>>>>>>>> something
>>>>>>>>>>> wrong in my code?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>>>>>>>>>>
>>>>>>>>>>> The unit tests also seem to be disabled for this as well and so
>>>>>>>>>>> I don’t know if the PTransform behaves as expected.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>>>>>>>>>>
>>>>>>>>>>> package ca.loblaw.cerebro.PipelineControl;
>>>>>>>>>>>>
>>>>>>>>>>>> import com.google.api.services.bigquery.model.TableReference;
>>>>>>>>>>>> import
>>>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
>>>>>>>>>>>> ;
>>>>>>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema;
>>>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>>>>>>>>>>>> import org.apache.beam.sdk.transforms.Create;
>>>>>>>>>>>> import org.apache.beam.sdk.values.Row;
>>>>>>>>>>>>
>>>>>>>>>>>> import java.io.File;
>>>>>>>>>>>> import java.util.Arrays;
>>>>>>>>>>>> import java.util.HashSet;
>>>>>>>>>>>> import java.util.stream.Collectors;
>>>>>>>>>>>>
>>>>>>>>>>>> import static java.util.Arrays.*asList*;
>>>>>>>>>>>>
>>>>>>>>>>>> public class BQRenameFields {
>>>>>>>>>>>>     public static void main(String[] args) {
>>>>>>>>>>>>         PipelineOptionsFactory.*register*(
>>>>>>>>>>>> DataflowPipelineOptions.class);
>>>>>>>>>>>>         DataflowPipelineOptions options =
>>>>>>>>>>>> PipelineOptionsFactory.*fromArgs*(args).as(
>>>>>>>>>>>> DataflowPipelineOptions.class);
>>>>>>>>>>>>         options.setFilesToStage(
>>>>>>>>>>>>                 Arrays.*stream*(System.*getProperty*(
>>>>>>>>>>>> "java.class.path").
>>>>>>>>>>>>                         split(File.*pathSeparator*)).
>>>>>>>>>>>>                         map(entry -> (new
>>>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*()));
>>>>>>>>>>>>
>>>>>>>>>>>>         Pipeline pipeline = Pipeline.*create*(options);
>>>>>>>>>>>>
>>>>>>>>>>>>         Schema nestedSchema = Schema.*builder*().addField(
>>>>>>>>>>>> Schema.Field.*nullable*("field1_0", Schema.FieldType.*STRING*
>>>>>>>>>>>> )).build();
>>>>>>>>>>>>         Schema.Field field = Schema.Field.*nullable*("field0_0"
>>>>>>>>>>>> , Schema.FieldType.*STRING*);
>>>>>>>>>>>>         Schema.Field nested = Schema.Field.*nullable*(
>>>>>>>>>>>> "field0_1", Schema.FieldType.*row*(nestedSchema));
>>>>>>>>>>>>         Schema.Field runner = Schema.Field.*nullable*(
>>>>>>>>>>>> "field0_2", Schema.FieldType.*STRING*);
>>>>>>>>>>>>         Schema rowSchema = Schema.*builder*()
>>>>>>>>>>>>                 .addFields(field, nested, runner)
>>>>>>>>>>>>                 .build();
>>>>>>>>>>>>         Row testRow = Row.*withSchema*(rowSchema).attachValues(
>>>>>>>>>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues(
>>>>>>>>>>>> "value1_0"), options.getRunner().toString());
>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>                 .apply(Create.*of*(testRow).withRowSchema(
>>>>>>>>>>>> rowSchema))
>>>>>>>>>>>>                 .apply(RenameFields.<Row>*create*()
>>>>>>>>>>>>                         .rename("field0_0", "stringField")
>>>>>>>>>>>>                         .rename("field0_1", "nestedField")
>>>>>>>>>>>>                         .rename("field0_1.field1_0",
>>>>>>>>>>>> "nestedStringField")
>>>>>>>>>>>>                         .rename("field0_2", "runner"))
>>>>>>>>>>>>                 .apply(BigQueryIO.<Row>*write*()
>>>>>>>>>>>>                         .to(new TableReference().setProjectId(
>>>>>>>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId(
>>>>>>>>>>>> "matto_renameFields"))
>>>>>>>>>>>>                         .withCreateDisposition(BigQueryIO.Write
>>>>>>>>>>>> .CreateDisposition.*CREATE_IF_NEEDED*)
>>>>>>>>>>>>                         .withWriteDisposition(BigQueryIO.Write.
>>>>>>>>>>>> WriteDisposition.*WRITE_APPEND*)
>>>>>>>>>>>>                         .withSchemaUpdateOptions(new HashSet<>(
>>>>>>>>>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption.
>>>>>>>>>>>> *ALLOW_FIELD_ADDITION*)))
>>>>>>>>>>>>                         .useBeamSchema());
>>>>>>>>>>>>         pipeline.run();
>>>>>>>>>>>>     }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to