Correct. On Thu, Jun 3, 2021 at 9:51 AM Kenneth Knowles <k...@apache.org> wrote:
> I still don't quite grok the details of how this succeeds or fails in > different situations. The invalid row succeeds in serialization because the > coder is not sensitive to the way in which it is invalid? > > Kenn > > On Wed, Jun 2, 2021 at 2:54 PM Brian Hulette <bhule...@google.com> wrote: > >> > One thing that's been on the back burner for a long time is making >> CoderProperties into a CoderTester like Guava's EqualityTester. >> >> Reuven's point still applies here though. This issue is not due to a bug >> in SchemaCoder, it's a problem with the Row we gave SchemaCoder to encode. >> I'm assuming a CoderTester would require manually generating inputs right? >> These input Rows represent an illegal state that we wouldn't test with. >> (That being said I like the idea of a CoderTester in general) >> >> Brian >> >> On Wed, Jun 2, 2021 at 12:11 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> Mutability checking might catch that. >>> >>> I meant to suggest not putting the check in the pipeline, but offering a >>> testing discipline that will catch such issues. One thing that's been on >>> the back burner for a long time is making CoderProperties into a >>> CoderTester like Guava's EqualityTester. Then it can run through all the >>> properties without a user setting up test suites. Downside is that the test >>> failure signal gets aggregated. >>> >>> Kenn >>> >>> On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette <bhule...@google.com> >>> wrote: >>> >>>> Could the DirectRunner just do an equality check whenever it does an >>>> encode/decode? It sounds like it's already effectively performing >>>> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting >>>> the equality check. >>>> >>>> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> There is no bug in the Coder itself, so that wouldn't catch it. We >>>>> could insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, >>>>> but if the Direct runner already does an encode/decode before that ParDo, >>>>> then that would have fixed the problem before we could see it. >>>>> >>>>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles <k...@apache.org> >>>>> wrote: >>>>> >>>>>> Would it be caught by CoderProperties? >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> I don't think this bug is schema specific - we created a Java object >>>>>>> that is inconsistent with its encoded form, which could happen to any >>>>>>> transform. >>>>>>> >>>>>>> This does seem to be a gap in DirectRunner testing though. It also >>>>>>> makes it hard to test using PAssert, as I believe that puts everything >>>>>>> in a >>>>>>> side input, forcing an encoding/decoding. >>>>>>> >>>>>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette <bhule...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> +dev <d...@beam.apache.org> >>>>>>>> >>>>>>>> > I bet the DirectRunner is encoding and decoding in between, which >>>>>>>> fixes the object. >>>>>>>> >>>>>>>> Do we need better testing of schema-aware (and potentially other >>>>>>>> built-in) transforms in the face of fusion to root out issues like >>>>>>>> this? >>>>>>>> >>>>>>>> Brian >>>>>>>> >>>>>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang < >>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I have some other work-related things I need to do this week, so I >>>>>>>>> will likely report back on this over the weekend. Thank you for the >>>>>>>>> explanation. It makes perfect sense now. >>>>>>>>> >>>>>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Some more context - the problem is that RenameFields outputs (in >>>>>>>>>> this case) Java Row objects that are inconsistent with the actual >>>>>>>>>> schema. >>>>>>>>>> For example if you have the following schema: >>>>>>>>>> >>>>>>>>>> Row { >>>>>>>>>> field1: Row { >>>>>>>>>> field2: string >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> And rename field1.field2 -> renamed, you'll get the following >>>>>>>>>> schema >>>>>>>>>> >>>>>>>>>> Row { >>>>>>>>>> field1: Row { >>>>>>>>>> renamed: string >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> However the Java object for the _nested_ row will return the old >>>>>>>>>> schema if getSchema() is called on it. This is because we only >>>>>>>>>> update the >>>>>>>>>> schema on the top-level row. >>>>>>>>>> >>>>>>>>>> I think this explains why your test works in the direct runner. >>>>>>>>>> If the row ever goes through an encode/decode path, it will come back >>>>>>>>>> correct. The original incorrect Java objects are no longer around, >>>>>>>>>> and new >>>>>>>>>> (consistent) objects are constructed from the raw data and the >>>>>>>>>> PCollection >>>>>>>>>> schema. Dataflow tends to fuse ParDos together, so the following >>>>>>>>>> ParDo will >>>>>>>>>> see the incorrect Row object. I bet the DirectRunner is encoding and >>>>>>>>>> decoding in between, which fixes the object. >>>>>>>>>> >>>>>>>>>> You can validate this theory by forcing a shuffle after >>>>>>>>>> RenameFields using Reshufflle. It should fix the issue If it does, >>>>>>>>>> let me >>>>>>>>>> know and I'll work on a fix to RenameFields. >>>>>>>>>> >>>>>>>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax <re...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Aha, yes this indeed another bug in the transform. The schema is >>>>>>>>>>> set on the top-level Row but not on any nested rows. >>>>>>>>>>> >>>>>>>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang < >>>>>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thank you everyone for your input. I believe it will be >>>>>>>>>>>> easiest to respond to all feedback in a single message rather than >>>>>>>>>>>> messages >>>>>>>>>>>> per person. >>>>>>>>>>>> >>>>>>>>>>>> - NeedsRunner - The tests are run eventually, so obviously >>>>>>>>>>>> all good on my end. I was trying to run the smallest subset of >>>>>>>>>>>> test cases >>>>>>>>>>>> possible and didn't venture beyond `gradle test`. >>>>>>>>>>>> - Stack Trace - There wasn't any unfortunately because no >>>>>>>>>>>> exception thrown in the code. The Beam Row was translated into >>>>>>>>>>>> a BQ >>>>>>>>>>>> TableRow and an insertion was attempted. The error "message" >>>>>>>>>>>> was part of >>>>>>>>>>>> the response JSON that came back as a result of a request >>>>>>>>>>>> against the BQ >>>>>>>>>>>> API. >>>>>>>>>>>> - Desired Behaviour - (field0_1.field1_0, >>>>>>>>>>>> nestedStringField) -> field0_1.nestedStringField is what I am >>>>>>>>>>>> looking for. >>>>>>>>>>>> - Info Logging Findings (In Lieu of a Stack Trace) >>>>>>>>>>>> - The Beam Schema was as expected with all renames >>>>>>>>>>>> applied. >>>>>>>>>>>> - The example I provided was heavily stripped down in >>>>>>>>>>>> order to isolate the problem. My work example which a bit >>>>>>>>>>>> impractical >>>>>>>>>>>> because it's part of some generic tooling has 4 levels of >>>>>>>>>>>> nesting and also >>>>>>>>>>>> produces the correct output too. >>>>>>>>>>>> - BigQueryUtils.toTableRow(Row) returns the expected >>>>>>>>>>>> TableRow in DirectRunner. In DataflowRunner however, only >>>>>>>>>>>> the top-level >>>>>>>>>>>> renames were reflected in the TableRow and all renames in >>>>>>>>>>>> the nested fields >>>>>>>>>>>> weren't. >>>>>>>>>>>> - BigQueryUtils.toTableRow(Row) recurses on the Row >>>>>>>>>>>> values and uses the Row.schema to get the field names. This >>>>>>>>>>>> makes sense to >>>>>>>>>>>> me, but if a value is actually a Row then its schema appears >>>>>>>>>>>> to be >>>>>>>>>>>> inconsistent with the top-level schema >>>>>>>>>>>> - My Current Workaround - I forked RenameFields and >>>>>>>>>>>> replaced the attachValues in expand method to be a "deep" >>>>>>>>>>>> rename. This is >>>>>>>>>>>> obviously inefficient and I will not be submitting a PR for >>>>>>>>>>>> that. >>>>>>>>>>>> - JIRA ticket - >>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-12442 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax <re...@google.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> This transform is the same across all runners. A few comments >>>>>>>>>>>>> on the test: >>>>>>>>>>>>> >>>>>>>>>>>>> - Using attachValues directly is error prone (per the >>>>>>>>>>>>> comment on the method). I recommend using the withFieldValue >>>>>>>>>>>>> builders instead. >>>>>>>>>>>>> - I recommend capturing the RenameFields PCollection into a >>>>>>>>>>>>> local variable of type PCollection<Row> and printing out the >>>>>>>>>>>>> schema (which >>>>>>>>>>>>> you can get using the PCollection.getSchema method) to ensure >>>>>>>>>>>>> that the >>>>>>>>>>>>> output schema looks like you expect. >>>>>>>>>>>>> - RenameFields doesn't flatten. So renaming >>>>>>>>>>>>> field0_1.field1_0 - > nestedStringField results in >>>>>>>>>>>>> field0_1.nestedStringField; if you wanted to flatten, then the >>>>>>>>>>>>> better >>>>>>>>>>>>> transform would be Select.fieldNameAs("field0_1.field1_0", >>>>>>>>>>>>> nestedStringField). >>>>>>>>>>>>> >>>>>>>>>>>>> This all being said, eyeballing the implementation of >>>>>>>>>>>>> RenameFields makes me think that it is buggy in the case where >>>>>>>>>>>>> you specify >>>>>>>>>>>>> a top-level field multiple times like you do. I think it is simply >>>>>>>>>>>>> adding the top-level field into the output schema multiple times, >>>>>>>>>>>>> and the >>>>>>>>>>>>> second time is with the field0_1 base name; I have no idea why >>>>>>>>>>>>> your test >>>>>>>>>>>>> doesn't catch this in the DirectRunner, as it's equally broken >>>>>>>>>>>>> there. Could >>>>>>>>>>>>> you file a JIRA about this issue and assign it to me? >>>>>>>>>>>>> >>>>>>>>>>>>> Reuven >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles < >>>>>>>>>>>>> k...@apache.org> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette < >>>>>>>>>>>>>> bhule...@google.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Matthew, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > The unit tests also seem to be disabled for this as well >>>>>>>>>>>>>>> and so I don’t know if the PTransform behaves as expected. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our >>>>>>>>>>>>>>> testing framework. NeedsRunner indicates that a test suite >>>>>>>>>>>>>>> can't be >>>>>>>>>>>>>>> executed with the SDK alone, it needs a runner. So that >>>>>>>>>>>>>>> exclusion just >>>>>>>>>>>>>>> makes sure we don't run the test when we're verifying the SDK >>>>>>>>>>>>>>> by itself in >>>>>>>>>>>>>>> the :sdks:java:core:test task. The test is still run in other >>>>>>>>>>>>>>> tasks where >>>>>>>>>>>>>>> we have a runner, most notably in the Java PreCommit [1], where >>>>>>>>>>>>>>> we run it >>>>>>>>>>>>>>> as part of the :runners:direct-java:test task. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> That being said, we may only run these tests continuously >>>>>>>>>>>>>>> with the DirectRunner, I'm not sure if we test them on all the >>>>>>>>>>>>>>> runners like >>>>>>>>>>>>>>> we do with ValidatesRunner tests. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> That is correct. The tests are tests _of the transform_ so >>>>>>>>>>>>>> they run only on the DirectRunner. They are not tests of the >>>>>>>>>>>>>> runner, which >>>>>>>>>>>>>> is only responsible for correctly implementing Beam's >>>>>>>>>>>>>> primitives. The >>>>>>>>>>>>>> transform should not behave differently on different runners, >>>>>>>>>>>>>> except for >>>>>>>>>>>>>> fundamental differences in how they schedule work and checkpoint. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> > The error message I’m receiving, : Error while reading >>>>>>>>>>>>>>> data, error message: JSON parsing error in row starting at >>>>>>>>>>>>>>> position 0: No >>>>>>>>>>>>>>> such field: nestedField.field1_0, suggests the BigQuery is >>>>>>>>>>>>>>> trying to use the original name for the nested field and not >>>>>>>>>>>>>>> the substitute >>>>>>>>>>>>>>> name. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Is there a stacktrace associated with this error? It would >>>>>>>>>>>>>>> be helpful to see where the error is coming from. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Brian >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang < >>>>>>>>>>>>>>> matthew.ouy...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I’m trying to use the RenameFields transform prior to >>>>>>>>>>>>>>>> inserting into BigQuery on nested fields. Insertion into >>>>>>>>>>>>>>>> BigQuery is >>>>>>>>>>>>>>>> successful with DirectRunner, but DataflowRunner has an issue >>>>>>>>>>>>>>>> with renamed >>>>>>>>>>>>>>>> nested fields The error message I’m receiving, : Error >>>>>>>>>>>>>>>> while reading data, error message: JSON parsing error in row >>>>>>>>>>>>>>>> starting at >>>>>>>>>>>>>>>> position 0: No such field: nestedField.field1_0, suggests >>>>>>>>>>>>>>>> the BigQuery is trying to use the original name for the nested >>>>>>>>>>>>>>>> field and >>>>>>>>>>>>>>>> not the substitute name. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The code for RenameFields seems simple enough but does it >>>>>>>>>>>>>>>> behave differently in different runners? Will a deep >>>>>>>>>>>>>>>> attachValues be >>>>>>>>>>>>>>>> necessary in order get the nested renames to work across all >>>>>>>>>>>>>>>> runners? Is >>>>>>>>>>>>>>>> there something wrong in my code? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The unit tests also seem to be disabled for this as well >>>>>>>>>>>>>>>> and so I don’t know if the PTransform behaves as expected. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> package ca.loblaw.cerebro.PipelineControl; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> import >>>>>>>>>>>>>>>>> com.google.api.services.bigquery.model.TableReference; >>>>>>>>>>>>>>>>> import >>>>>>>>>>>>>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions >>>>>>>>>>>>>>>>> ; >>>>>>>>>>>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>>>>>>>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>>>>>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.Schema; >>>>>>>>>>>>>>>>> import org.apache.beam.sdk.schemas.transforms.RenameFields >>>>>>>>>>>>>>>>> ; >>>>>>>>>>>>>>>>> import org.apache.beam.sdk.transforms.Create; >>>>>>>>>>>>>>>>> import org.apache.beam.sdk.values.Row; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> import java.io.File; >>>>>>>>>>>>>>>>> import java.util.Arrays; >>>>>>>>>>>>>>>>> import java.util.HashSet; >>>>>>>>>>>>>>>>> import java.util.stream.Collectors; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> import static java.util.Arrays.*asList*; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> public class BQRenameFields { >>>>>>>>>>>>>>>>> public static void main(String[] args) { >>>>>>>>>>>>>>>>> PipelineOptionsFactory.*register*( >>>>>>>>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>>>>>>>> DataflowPipelineOptions options = >>>>>>>>>>>>>>>>> PipelineOptionsFactory.*fromArgs*(args).as( >>>>>>>>>>>>>>>>> DataflowPipelineOptions.class); >>>>>>>>>>>>>>>>> options.setFilesToStage( >>>>>>>>>>>>>>>>> Arrays.*stream*(System.*getProperty*( >>>>>>>>>>>>>>>>> "java.class.path"). >>>>>>>>>>>>>>>>> split(File.*pathSeparator*)). >>>>>>>>>>>>>>>>> map(entry -> (new >>>>>>>>>>>>>>>>> File(entry)).toString()).collect(Collectors.*toList*())); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Pipeline pipeline = Pipeline.*create*(options); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Schema nestedSchema = Schema.*builder*().addField( >>>>>>>>>>>>>>>>> Schema.Field.*nullable*("field1_0", Schema.FieldType. >>>>>>>>>>>>>>>>> *STRING*)).build(); >>>>>>>>>>>>>>>>> Schema.Field field = Schema.Field.*nullable*( >>>>>>>>>>>>>>>>> "field0_0", Schema.FieldType.*STRING*); >>>>>>>>>>>>>>>>> Schema.Field nested = Schema.Field.*nullable*( >>>>>>>>>>>>>>>>> "field0_1", Schema.FieldType.*row*(nestedSchema)); >>>>>>>>>>>>>>>>> Schema.Field runner = Schema.Field.*nullable*( >>>>>>>>>>>>>>>>> "field0_2", Schema.FieldType.*STRING*); >>>>>>>>>>>>>>>>> Schema rowSchema = Schema.*builder*() >>>>>>>>>>>>>>>>> .addFields(field, nested, runner) >>>>>>>>>>>>>>>>> .build(); >>>>>>>>>>>>>>>>> Row testRow = Row.*withSchema*(rowSchema >>>>>>>>>>>>>>>>> ).attachValues("value0_0", Row.*withSchema*(nestedSchema >>>>>>>>>>>>>>>>> ).attachValues("value1_0"), options >>>>>>>>>>>>>>>>> .getRunner().toString()); >>>>>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>>>>> .apply(Create.*of*(testRow).withRowSchema( >>>>>>>>>>>>>>>>> rowSchema)) >>>>>>>>>>>>>>>>> .apply(RenameFields.<Row>*create*() >>>>>>>>>>>>>>>>> .rename("field0_0", "stringField") >>>>>>>>>>>>>>>>> .rename("field0_1", "nestedField") >>>>>>>>>>>>>>>>> .rename("field0_1.field1_0", >>>>>>>>>>>>>>>>> "nestedStringField") >>>>>>>>>>>>>>>>> .rename("field0_2", "runner")) >>>>>>>>>>>>>>>>> .apply(BigQueryIO.<Row>*write*() >>>>>>>>>>>>>>>>> .to(new >>>>>>>>>>>>>>>>> TableReference().setProjectId("lt-dia-lake-exp-raw" >>>>>>>>>>>>>>>>> ).setDatasetId("prototypes").setTableId( >>>>>>>>>>>>>>>>> "matto_renameFields")) >>>>>>>>>>>>>>>>> .withCreateDisposition(BigQueryIO. >>>>>>>>>>>>>>>>> Write.CreateDisposition.*CREATE_IF_NEEDED*) >>>>>>>>>>>>>>>>> .withWriteDisposition(BigQueryIO. >>>>>>>>>>>>>>>>> Write.WriteDisposition.*WRITE_APPEND*) >>>>>>>>>>>>>>>>> .withSchemaUpdateOptions(new >>>>>>>>>>>>>>>>> HashSet<>(*asList*(BigQueryIO.Write.SchemaUpdateOption. >>>>>>>>>>>>>>>>> *ALLOW_FIELD_ADDITION*))) >>>>>>>>>>>>>>>>> .useBeamSchema()); >>>>>>>>>>>>>>>>> pipeline.run(); >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>