Hi Matthew, > The unit tests also seem to be disabled for this as well and so I don’t know if the PTransform behaves as expected.
The exclusion for NeedsRunner tests is just a quirk in our testing framework. NeedsRunner indicates that a test suite can't be executed with the SDK alone, it needs a runner. So that exclusion just makes sure we don't run the test when we're verifying the SDK by itself in the :sdks:java:core:test task. The test is still run in other tasks where we have a runner, most notably in the Java PreCommit [1], where we run it as part of the :runners:direct-java:test task. That being said, we may only run these tests continuously with the DirectRunner, I'm not sure if we test them on all the runners like we do with ValidatesRunner tests. > The error message I’m receiving, : Error while reading data, error message: JSON parsing error in row starting at position 0: No such field: nestedField.field1_0, suggests the BigQuery is trying to use the original name for the nested field and not the substitute name. Is there a stacktrace associated with this error? It would be helpful to see where the error is coming from. Brian [1] https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/ On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang <matthew.ouy...@gmail.com> wrote: > I’m trying to use the RenameFields transform prior to inserting into > BigQuery on nested fields. Insertion into BigQuery is successful with > DirectRunner, but DataflowRunner has an issue with renamed nested fields > The error message I’m receiving, : Error while reading data, error > message: JSON parsing error in row starting at position 0: No such field: > nestedField.field1_0, suggests the BigQuery is trying to use the original > name for the nested field and not the substitute name. > > The code for RenameFields seems simple enough but does it behave > differently in different runners? Will a deep attachValues be necessary in > order get the nested renames to work across all runners? Is there something > wrong in my code? > > > https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186 > > The unit tests also seem to be disabled for this as well and so I don’t > know if the PTransform behaves as expected. > > > https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67 > > > https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java > > package ca.loblaw.cerebro.PipelineControl; >> >> import com.google.api.services.bigquery.model.TableReference; >> import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; >> import org.apache.beam.sdk.Pipeline; >> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >> import org.apache.beam.sdk.options.PipelineOptionsFactory; >> import org.apache.beam.sdk.schemas.Schema; >> import org.apache.beam.sdk.schemas.transforms.RenameFields; >> import org.apache.beam.sdk.transforms.Create; >> import org.apache.beam.sdk.values.Row; >> >> import java.io.File; >> import java.util.Arrays; >> import java.util.HashSet; >> import java.util.stream.Collectors; >> >> import static java.util.Arrays.*asList*; >> >> public class BQRenameFields { >> public static void main(String[] args) { >> PipelineOptionsFactory.*register*(DataflowPipelineOptions.class); >> DataflowPipelineOptions options = PipelineOptionsFactory. >> *fromArgs*(args).as(DataflowPipelineOptions.class); >> options.setFilesToStage( >> Arrays.*stream*(System.*getProperty*("java.class.path"). >> split(File.*pathSeparator*)). >> map(entry -> (new >> File(entry)).toString()).collect(Collectors.*toList*())); >> >> Pipeline pipeline = Pipeline.*create*(options); >> >> Schema nestedSchema = Schema.*builder*().addField(Schema.Field. >> *nullable*("field1_0", Schema.FieldType.*STRING*)).build(); >> Schema.Field field = Schema.Field.*nullable*("field0_0", Schema. >> FieldType.*STRING*); >> Schema.Field nested = Schema.Field.*nullable*("field0_1", Schema. >> FieldType.*row*(nestedSchema)); >> Schema.Field runner = Schema.Field.*nullable*("field0_2", Schema. >> FieldType.*STRING*); >> Schema rowSchema = Schema.*builder*() >> .addFields(field, nested, runner) >> .build(); >> Row testRow = Row.*withSchema*(rowSchema).attachValues("value0_0" >> , Row.*withSchema*(nestedSchema).attachValues("value1_0"), options >> .getRunner().toString()); >> pipeline >> .apply(Create.*of*(testRow).withRowSchema(rowSchema)) >> .apply(RenameFields.<Row>*create*() >> .rename("field0_0", "stringField") >> .rename("field0_1", "nestedField") >> .rename("field0_1.field1_0", "nestedStringField") >> .rename("field0_2", "runner")) >> .apply(BigQueryIO.<Row>*write*() >> .to(new TableReference().setProjectId( >> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId( >> "matto_renameFields")) >> .withCreateDisposition(BigQueryIO.Write. >> CreateDisposition.*CREATE_IF_NEEDED*) >> .withWriteDisposition(BigQueryIO.Write. >> WriteDisposition.*WRITE_APPEND*) >> .withSchemaUpdateOptions(new HashSet<>(*asList*( >> BigQueryIO.Write.SchemaUpdateOption.*ALLOW_FIELD_ADDITION*))) >> .useBeamSchema()); >> pipeline.run(); >> } >> } >> >