I don't think this bug is schema specific - we created a Java object that is inconsistent with its encoded form, which could happen to any transform.
This does seem to be a gap in DirectRunner testing though. It also makes it hard to test using PAssert, as I believe that puts everything in a side input, forcing an encoding/decoding. On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com> wrote: > +dev <d...@beam.apache.org> > > > I bet the DirectRunner is encoding and decoding in between, which fixes > the object. > > Do we need better testing of schema-aware (and potentially other built-in) > transforms in the face of fusion to root out issues like this? > > Brian > > On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <matthew.ouy...@gmail.com> > wrote: > >> I have some other work-related things I need to do this week, so I will >> likely report back on this over the weekend. Thank you for the >> explanation. It makes perfect sense now. >> >> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote: >> >>> Some more context - the problem is that RenameFields outputs (in this >>> case) Java Row objects that are inconsistent with the actual schema. >>> For example if you have the following schema: >>> >>> Row { >>> field1: Row { >>> field2: string >>> } >>> } >>> >>> And rename field1.field2 -> renamed, you'll get the following schema >>> >>> Row { >>> field1: Row { >>> renamed: string >>> } >>> } >>> >>> However the Java object for the _nested_ row will return the old schema >>> if getSchema() is called on it. This is because we only update the schema >>> on the top-level row. >>> >>> I think this explains why your test works in the direct runner. If the >>> row ever goes through an encode/decode path, it will come back correct. The >>> original incorrect Java objects are no longer around, and new (consistent) >>> objects are constructed from the raw data and the PCollection schema. >>> Dataflow tends to fuse ParDos together, so the following ParDo will see the >>> incorrect Row object. I bet the DirectRunner is encoding and decoding in >>> between, which fixes the object. >>> >>> You can validate this theory by forcing a shuffle after RenameFields >>> using Reshufflle. It should fix the issue If it does, let me know and I'll >>> work on a fix to RenameFields. >>> >>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote: >>> >>>> Aha, yes this indeed another bug in the transform. The schema is set on >>>> the top-level Row but not on any nested rows. >>>> >>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <matthew.ouy...@gmail.com> >>>> wrote: >>>> >>>>> Thank you everyone for your input. I believe it will be easiest to >>>>> respond to all feedback in a single message rather than messages per >>>>> person. >>>>> >>>>> - NeedsRunner - The tests are run eventually, so obviously all >>>>> good on my end. I was trying to run the smallest subset of test cases >>>>> possible and didn't venture beyond `gradle test`. >>>>> - Stack Trace - There wasn't any unfortunately because no >>>>> exception thrown in the code. The Beam Row was translated into a BQ >>>>> TableRow and an insertion was attempted. The error "message" was part >>>>> of >>>>> the response JSON that came back as a result of a request against the >>>>> BQ >>>>> API. >>>>> - Desired Behaviour - (field0_1.field1_0, nestedStringField) -> >>>>> field0_1.nestedStringField is what I am looking for. >>>>> - Info Logging Findings (In Lieu of a Stack Trace) >>>>> - The Beam Schema was as expected with all renames applied. >>>>> - The example I provided was heavily stripped down in order to >>>>> isolate the problem. My work example which a bit impractical >>>>> because it's >>>>> part of some generic tooling has 4 levels of nesting and also >>>>> produces the >>>>> correct output too. >>>>> - BigQueryUtils.toTableRow(Row) returns the expected TableRow >>>>> in DirectRunner. In DataflowRunner however, only the top-level >>>>> renames >>>>> were reflected in the TableRow and all renames in the nested fields >>>>> weren't. >>>>> - BigQueryUtils.toTableRow(Row) recurses on the Row values and >>>>> uses the Row.schema to get the field names. This makes sense to >>>>> me, but if >>>>> a value is actually a Row then its schema appears to be >>>>> inconsistent with >>>>> the top-level schema >>>>> - My Current Workaround - I forked RenameFields and replaced the >>>>> attachValues in expand method to be a "deep" rename. This is obviously >>>>> inefficient and I will not be submitting a PR for that. >>>>> - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442 >>>>> >>>>> >>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> This transform is the same across all runners. A few comments on the >>>>>> test: >>>>>> >>>>>> - Using attachValues directly is error prone (per the comment on >>>>>> the method). I recommend using the withFieldValue builders instead. >>>>>> - I recommend capturing the RenameFields PCollection into a local >>>>>> variable of type PCollection<Row> and printing out the schema (which you >>>>>> can get using the PCollection.getSchema method) to ensure that the output >>>>>> schema looks like you expect. >>>>>> - RenameFields doesn't flatten. So renaming field0_1.field1_0 - > >>>>>> nestedStringField results in field0_1.nestedStringField; if you wanted to >>>>>> flatten, then the better transform would be >>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField). >>>>>> >>>>>> This all being said, eyeballing the implementation of RenameFields >>>>>> makes me think that it is buggy in the case where you specify a top-level >>>>>> field multiple times like you do. I think it is simply adding the >>>>>> top-level >>>>>> field into the output schema multiple times, and the second time is with >>>>>> the field0_1 base name; I have no idea why your test doesn't catch this >>>>>> in >>>>>> the DirectRunner, as it's equally broken there. Could you file a JIRA >>>>>> about >>>>>> this issue and assign it to me? >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Matthew, >>>>>>>> >>>>>>>> > The unit tests also seem to be disabled for this as well and so I >>>>>>>> don’t know if the PTransform behaves as expected. >>>>>>>> >>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our testing >>>>>>>> framework. NeedsRunner indicates that a test suite can't be executed >>>>>>>> with >>>>>>>> the SDK alone, it needs a runner. So that exclusion just makes sure we >>>>>>>> don't run the test when we're verifying the SDK by itself in the >>>>>>>> :sdks:java:core:test task. The test is still run in other tasks where >>>>>>>> we >>>>>>>> have a runner, most notably in the Java PreCommit [1], where we run it >>>>>>>> as >>>>>>>> part of the :runners:direct-java:test task. >>>>>>>> >>>>>>>> That being said, we may only run these tests continuously with the >>>>>>>> DirectRunner, I'm not sure if we test them on all the runners like we >>>>>>>> do >>>>>>>> with ValidatesRunner tests. >>>>>>>> >>>>>>> >>>>>>> That is correct. The tests are tests _of the transform_ so they run >>>>>>> only on the DirectRunner. They are not tests of the runner, which is >>>>>>> only >>>>>>> responsible for correctly implementing Beam's primitives. The transform >>>>>>> should not behave differently on different runners, except for >>>>>>> fundamental >>>>>>> differences in how they schedule work and checkpoint. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> >>>>>>>> > The error message I’m receiving, : Error while reading data, >>>>>>>> error message: JSON parsing error in row starting at position 0: No >>>>>>>> such >>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to >>>>>>>> use the original name for the nested field and not the substitute name. >>>>>>>> >>>>>>>> Is there a stacktrace associated with this error? It would be >>>>>>>> helpful to see where the error is coming from. >>>>>>>> >>>>>>>> Brian >>>>>>>> >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >>>>>>>> >>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang < >>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I’m trying to use the RenameFields transform prior to inserting >>>>>>>>> into BigQuery on nested fields. Insertion into BigQuery is >>>>>>>>> successful with >>>>>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested >>>>>>>>> fields >>>>>>>>> The error message I’m receiving, : Error while reading data, >>>>>>>>> error message: JSON parsing error in row starting at position 0: No >>>>>>>>> such >>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to >>>>>>>>> use the original name for the nested field and not the substitute >>>>>>>>> name. >>>>>>>>> >>>>>>>>> The code for RenameFields seems simple enough but does it behave >>>>>>>>> differently in different runners? Will a deep attachValues be >>>>>>>>> necessary in >>>>>>>>> order get the nested renames to work across all runners? Is there >>>>>>>>> something >>>>>>>>> wrong in my code? >>>>>>>>> >>>>>>>>> >>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>>>>>>>> >>>>>>>>> The unit tests also seem to be disabled for this as well and so I >>>>>>>>> don’t know if the PTransform behaves as expected. >>>>>>>>> >>>>>>>>> >>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>>>>>>>> >>>>>>>>> >>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>>>>>>>> >>>>>>>>> package ca.loblaw.cerebro.PipelineControl; >>>>>>>>>> >>>>>>>>>> import com.google.api.services.bigquery.model.TableReference; >>>>>>>>>> import >>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; >>>>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>>>> import org.apache.beam.sdk.schemas.Schema; >>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields; >>>>>>>>>> import org.apache.beam.sdk.transforms.Create; >>>>>>>>>> import org.apache.beam.sdk.values.Row; >>>>>>>>>> >>>>>>>>>> import java.io.File; >>>>>>>>>> import java.util.Arrays; >>>>>>>>>> import java.util.HashSet; >>>>>>>>>> import java.util.stream.Collectors; >>>>>>>>>> >>>>>>>>>> import static java.util.Arrays.*asList*; >>>>>>>>>> >>>>>>>>>> public class BQRenameFields { >>>>>>>>>> public static void main(String[] args) { >>>>>>>>>> PipelineOptionsFactory.*register*(DataflowPipelineOptions >>>>>>>>>> .class); >>>>>>>>>> DataflowPipelineOptions options = PipelineOptionsFactory. >>>>>>>>>> *fromArgs*(args).as(DataflowPipelineOptions.class); >>>>>>>>>> options.setFilesToStage( >>>>>>>>>> Arrays.*stream*(System.*getProperty*( >>>>>>>>>> "java.class.path"). >>>>>>>>>> split(File.*pathSeparator*)). >>>>>>>>>> map(entry -> (new >>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>>>>>>>> >>>>>>>>>> Pipeline pipeline = Pipeline.*create*(options); >>>>>>>>>> >>>>>>>>>> Schema nestedSchema = Schema.*builder*().addField(Schema. >>>>>>>>>> Field.*nullable*("field1_0", Schema.FieldType.*STRING*)).build(); >>>>>>>>>> Schema.Field field = Schema.Field.*nullable*("field0_0", >>>>>>>>>> Schema.FieldType.*STRING*); >>>>>>>>>> Schema.Field nested = Schema.Field.*nullable*("field0_1" >>>>>>>>>> , Schema.FieldType.*row*(nestedSchema)); >>>>>>>>>> Schema.Field runner = Schema.Field.*nullable*("field0_2" >>>>>>>>>> , Schema.FieldType.*STRING*); >>>>>>>>>> Schema rowSchema = Schema.*builder*() >>>>>>>>>> .addFields(field, nested, runner) >>>>>>>>>> .build(); >>>>>>>>>> Row testRow = Row.*withSchema*(rowSchema).attachValues( >>>>>>>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues( >>>>>>>>>> "value1_0"), options.getRunner().toString()); >>>>>>>>>> pipeline >>>>>>>>>> .apply(Create.*of*(testRow).withRowSchema( >>>>>>>>>> rowSchema)) >>>>>>>>>> .apply(RenameFields.<Row>*create*() >>>>>>>>>> .rename("field0_0", "stringField") >>>>>>>>>> .rename("field0_1", "nestedField") >>>>>>>>>> .rename("field0_1.field1_0", >>>>>>>>>> "nestedStringField") >>>>>>>>>> .rename("field0_2", "runner")) >>>>>>>>>> .apply(BigQueryIO.<Row>*write*() >>>>>>>>>> .to(new TableReference().setProjectId( >>>>>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId( >>>>>>>>>> "matto_renameFields")) >>>>>>>>>> .withCreateDisposition(BigQueryIO.Write. >>>>>>>>>> CreateDisposition.*CREATE_IF_NEEDED*) >>>>>>>>>> .withWriteDisposition(BigQueryIO.Write. >>>>>>>>>> WriteDisposition.*WRITE_APPEND*) >>>>>>>>>> .withSchemaUpdateOptions(new HashSet<>( >>>>>>>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption. >>>>>>>>>> *ALLOW_FIELD_ADDITION*))) >>>>>>>>>> .useBeamSchema()); >>>>>>>>>> pipeline.run(); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>