There is no bug in the Coder itself, so that wouldn't catch it. We could insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if the Direct runner already does an encode/decode before that ParDo, then that would have fixed the problem before we could see it.
On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles <k...@apache.org> wrote: > Would it be caught by CoderProperties? > > Kenn > > On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <re...@google.com> wrote: > >> I don't think this bug is schema specific - we created a Java object that >> is inconsistent with its encoded form, which could happen to any transform. >> >> This does seem to be a gap in DirectRunner testing though. It also makes >> it hard to test using PAssert, as I believe that puts everything in a side >> input, forcing an encoding/decoding. >> >> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com> wrote: >> >>> +dev <d...@beam.apache.org> >>> >>> > I bet the DirectRunner is encoding and decoding in between, which >>> fixes the object. >>> >>> Do we need better testing of schema-aware (and potentially other >>> built-in) transforms in the face of fusion to root out issues like this? >>> >>> Brian >>> >>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <matthew.ouy...@gmail.com> >>> wrote: >>> >>>> I have some other work-related things I need to do this week, so I will >>>> likely report back on this over the weekend. Thank you for the >>>> explanation. It makes perfect sense now. >>>> >>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> Some more context - the problem is that RenameFields outputs (in this >>>>> case) Java Row objects that are inconsistent with the actual schema. >>>>> For example if you have the following schema: >>>>> >>>>> Row { >>>>> field1: Row { >>>>> field2: string >>>>> } >>>>> } >>>>> >>>>> And rename field1.field2 -> renamed, you'll get the following schema >>>>> >>>>> Row { >>>>> field1: Row { >>>>> renamed: string >>>>> } >>>>> } >>>>> >>>>> However the Java object for the _nested_ row will return the old >>>>> schema if getSchema() is called on it. This is because we only update the >>>>> schema on the top-level row. >>>>> >>>>> I think this explains why your test works in the direct runner. If the >>>>> row ever goes through an encode/decode path, it will come back correct. >>>>> The >>>>> original incorrect Java objects are no longer around, and new (consistent) >>>>> objects are constructed from the raw data and the PCollection schema. >>>>> Dataflow tends to fuse ParDos together, so the following ParDo will see >>>>> the >>>>> incorrect Row object. I bet the DirectRunner is encoding and decoding in >>>>> between, which fixes the object. >>>>> >>>>> You can validate this theory by forcing a shuffle after RenameFields >>>>> using Reshufflle. It should fix the issue If it does, let me know and I'll >>>>> work on a fix to RenameFields. >>>>> >>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> Aha, yes this indeed another bug in the transform. The schema is set >>>>>> on the top-level Row but not on any nested rows. >>>>>> >>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang < >>>>>> matthew.ouy...@gmail.com> wrote: >>>>>> >>>>>>> Thank you everyone for your input. I believe it will be easiest to >>>>>>> respond to all feedback in a single message rather than messages per >>>>>>> person. >>>>>>> >>>>>>> - NeedsRunner - The tests are run eventually, so obviously all >>>>>>> good on my end. I was trying to run the smallest subset of test >>>>>>> cases >>>>>>> possible and didn't venture beyond `gradle test`. >>>>>>> - Stack Trace - There wasn't any unfortunately because no >>>>>>> exception thrown in the code. The Beam Row was translated into a BQ >>>>>>> TableRow and an insertion was attempted. The error "message" was >>>>>>> part of >>>>>>> the response JSON that came back as a result of a request against >>>>>>> the BQ >>>>>>> API. >>>>>>> - Desired Behaviour - (field0_1.field1_0, nestedStringField) -> >>>>>>> field0_1.nestedStringField is what I am looking for. >>>>>>> - Info Logging Findings (In Lieu of a Stack Trace) >>>>>>> - The Beam Schema was as expected with all renames applied. >>>>>>> - The example I provided was heavily stripped down in order >>>>>>> to isolate the problem. My work example which a bit impractical >>>>>>> because >>>>>>> it's part of some generic tooling has 4 levels of nesting and >>>>>>> also produces >>>>>>> the correct output too. >>>>>>> - BigQueryUtils.toTableRow(Row) returns the expected TableRow >>>>>>> in DirectRunner. In DataflowRunner however, only the top-level >>>>>>> renames >>>>>>> were reflected in the TableRow and all renames in the nested >>>>>>> fields weren't. >>>>>>> - BigQueryUtils.toTableRow(Row) recurses on the Row values >>>>>>> and uses the Row.schema to get the field names. This makes sense >>>>>>> to me, >>>>>>> but if a value is actually a Row then its schema appears to be >>>>>>> inconsistent >>>>>>> with the top-level schema >>>>>>> - My Current Workaround - I forked RenameFields and replaced the >>>>>>> attachValues in expand method to be a "deep" rename. This is >>>>>>> obviously >>>>>>> inefficient and I will not be submitting a PR for that. >>>>>>> - JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442 >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> This transform is the same across all runners. A few comments on >>>>>>>> the test: >>>>>>>> >>>>>>>> - Using attachValues directly is error prone (per the comment on >>>>>>>> the method). I recommend using the withFieldValue builders instead. >>>>>>>> - I recommend capturing the RenameFields PCollection into a local >>>>>>>> variable of type PCollection<Row> and printing out the schema (which >>>>>>>> you >>>>>>>> can get using the PCollection.getSchema method) to ensure that the >>>>>>>> output >>>>>>>> schema looks like you expect. >>>>>>>> - RenameFields doesn't flatten. So renaming field0_1.field1_0 - >>>>>>>> > nestedStringField results in field0_1.nestedStringField; if you >>>>>>>> > wanted to >>>>>>>> flatten, then the better transform would be >>>>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField). >>>>>>>> >>>>>>>> This all being said, eyeballing the implementation of RenameFields >>>>>>>> makes me think that it is buggy in the case where you specify a >>>>>>>> top-level >>>>>>>> field multiple times like you do. I think it is simply adding the >>>>>>>> top-level >>>>>>>> field into the output schema multiple times, and the second time is >>>>>>>> with >>>>>>>> the field0_1 base name; I have no idea why your test doesn't catch >>>>>>>> this in >>>>>>>> the DirectRunner, as it's equally broken there. Could you file a JIRA >>>>>>>> about >>>>>>>> this issue and assign it to me? >>>>>>>> >>>>>>>> Reuven >>>>>>>> >>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles <k...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette <bhule...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Matthew, >>>>>>>>>> >>>>>>>>>> > The unit tests also seem to be disabled for this as well and so >>>>>>>>>> I don’t know if the PTransform behaves as expected. >>>>>>>>>> >>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our >>>>>>>>>> testing framework. NeedsRunner indicates that a test suite can't be >>>>>>>>>> executed with the SDK alone, it needs a runner. So that exclusion >>>>>>>>>> just >>>>>>>>>> makes sure we don't run the test when we're verifying the SDK by >>>>>>>>>> itself in >>>>>>>>>> the :sdks:java:core:test task. The test is still run in other tasks >>>>>>>>>> where >>>>>>>>>> we have a runner, most notably in the Java PreCommit [1], where we >>>>>>>>>> run it >>>>>>>>>> as part of the :runners:direct-java:test task. >>>>>>>>>> >>>>>>>>>> That being said, we may only run these tests continuously with >>>>>>>>>> the DirectRunner, I'm not sure if we test them on all the runners >>>>>>>>>> like we >>>>>>>>>> do with ValidatesRunner tests. >>>>>>>>>> >>>>>>>>> >>>>>>>>> That is correct. The tests are tests _of the transform_ so they >>>>>>>>> run only on the DirectRunner. They are not tests of the runner, which >>>>>>>>> is >>>>>>>>> only responsible for correctly implementing Beam's primitives. The >>>>>>>>> transform should not behave differently on different runners, except >>>>>>>>> for >>>>>>>>> fundamental differences in how they schedule work and checkpoint. >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> >>>>>>>>>> > The error message I’m receiving, : Error while reading data, >>>>>>>>>> error message: JSON parsing error in row starting at position 0: No >>>>>>>>>> such >>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to >>>>>>>>>> use the original name for the nested field and not the substitute >>>>>>>>>> name. >>>>>>>>>> >>>>>>>>>> Is there a stacktrace associated with this error? It would be >>>>>>>>>> helpful to see where the error is coming from. >>>>>>>>>> >>>>>>>>>> Brian >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >>>>>>>>>> >>>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang < >>>>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I’m trying to use the RenameFields transform prior to inserting >>>>>>>>>>> into BigQuery on nested fields. Insertion into BigQuery is >>>>>>>>>>> successful with >>>>>>>>>>> DirectRunner, but DataflowRunner has an issue with renamed nested >>>>>>>>>>> fields >>>>>>>>>>> The error message I’m receiving, : Error while reading data, >>>>>>>>>>> error message: JSON parsing error in row starting at position 0: No >>>>>>>>>>> such >>>>>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to >>>>>>>>>>> use the original name for the nested field and not the substitute >>>>>>>>>>> name. >>>>>>>>>>> >>>>>>>>>>> The code for RenameFields seems simple enough but does it behave >>>>>>>>>>> differently in different runners? Will a deep attachValues be >>>>>>>>>>> necessary in >>>>>>>>>>> order get the nested renames to work across all runners? Is there >>>>>>>>>>> something >>>>>>>>>>> wrong in my code? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>>>>>>>>>> >>>>>>>>>>> The unit tests also seem to be disabled for this as well and so >>>>>>>>>>> I don’t know if the PTransform behaves as expected. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>>>>>>>>>> >>>>>>>>>>> package ca.loblaw.cerebro.PipelineControl; >>>>>>>>>>>> >>>>>>>>>>>> import com.google.api.services.bigquery.model.TableReference; >>>>>>>>>>>> import >>>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions >>>>>>>>>>>> ; >>>>>>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema; >>>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields; >>>>>>>>>>>> import org.apache.beam.sdk.transforms.Create; >>>>>>>>>>>> import org.apache.beam.sdk.values.Row; >>>>>>>>>>>> >>>>>>>>>>>> import java.io.File; >>>>>>>>>>>> import java.util.Arrays; >>>>>>>>>>>> import java.util.HashSet; >>>>>>>>>>>> import java.util.stream.Collectors; >>>>>>>>>>>> >>>>>>>>>>>> import static java.util.Arrays.*asList*; >>>>>>>>>>>> >>>>>>>>>>>> public class BQRenameFields { >>>>>>>>>>>> public static void main(String[] args) { >>>>>>>>>>>> PipelineOptionsFactory.*register*( >>>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>>> DataflowPipelineOptions options = >>>>>>>>>>>> PipelineOptionsFactory.*fromArgs*(args).as( >>>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>>> options.setFilesToStage( >>>>>>>>>>>> Arrays.*stream*(System.*getProperty*( >>>>>>>>>>>> "java.class.path"). >>>>>>>>>>>> split(File.*pathSeparator*)). >>>>>>>>>>>> map(entry -> (new >>>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>>>>>>>>>> >>>>>>>>>>>> Pipeline pipeline = Pipeline.*create*(options); >>>>>>>>>>>> >>>>>>>>>>>> Schema nestedSchema = Schema.*builder*().addField( >>>>>>>>>>>> Schema.Field.*nullable*("field1_0", Schema.FieldType.*STRING* >>>>>>>>>>>> )).build(); >>>>>>>>>>>> Schema.Field field = Schema.Field.*nullable*("field0_0" >>>>>>>>>>>> , Schema.FieldType.*STRING*); >>>>>>>>>>>> Schema.Field nested = Schema.Field.*nullable*( >>>>>>>>>>>> "field0_1", Schema.FieldType.*row*(nestedSchema)); >>>>>>>>>>>> Schema.Field runner = Schema.Field.*nullable*( >>>>>>>>>>>> "field0_2", Schema.FieldType.*STRING*); >>>>>>>>>>>> Schema rowSchema = Schema.*builder*() >>>>>>>>>>>> .addFields(field, nested, runner) >>>>>>>>>>>> .build(); >>>>>>>>>>>> Row testRow = Row.*withSchema*(rowSchema).attachValues( >>>>>>>>>>>> "value0_0", Row.*withSchema*(nestedSchema).attachValues( >>>>>>>>>>>> "value1_0"), options.getRunner().toString()); >>>>>>>>>>>> pipeline >>>>>>>>>>>> .apply(Create.*of*(testRow).withRowSchema( >>>>>>>>>>>> rowSchema)) >>>>>>>>>>>> .apply(RenameFields.<Row>*create*() >>>>>>>>>>>> .rename("field0_0", "stringField") >>>>>>>>>>>> .rename("field0_1", "nestedField") >>>>>>>>>>>> .rename("field0_1.field1_0", >>>>>>>>>>>> "nestedStringField") >>>>>>>>>>>> .rename("field0_2", "runner")) >>>>>>>>>>>> .apply(BigQueryIO.<Row>*write*() >>>>>>>>>>>> .to(new TableReference().setProjectId( >>>>>>>>>>>> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId( >>>>>>>>>>>> "matto_renameFields")) >>>>>>>>>>>> .withCreateDisposition(BigQueryIO.Write >>>>>>>>>>>> .CreateDisposition.*CREATE_IF_NEEDED*) >>>>>>>>>>>> .withWriteDisposition(BigQueryIO.Write. >>>>>>>>>>>> WriteDisposition.*WRITE_APPEND*) >>>>>>>>>>>> .withSchemaUpdateOptions(new HashSet<>( >>>>>>>>>>>> *asList*(BigQueryIO.Write.SchemaUpdateOption. >>>>>>>>>>>> *ALLOW_FIELD_ADDITION*))) >>>>>>>>>>>> .useBeamSchema()); >>>>>>>>>>>> pipeline.run(); >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>