Yeah I think a minimal example would be really helpful. Then at the very least we should be able to bisect to identify the breaking change. For creating a test BigQuery database you might look at using the TestBigQuery rule [1]. There are several usage examples in the Beam repo [2].
[1] https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java [2] https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogBigQueryIT.java On Sun, Jul 17, 2022 at 4:00 AM Jimmy Headdon <jimmy.head...@mojiworks.com> wrote: > Hello - I've had no joy investigating this further, nor finding the > specific change in v2.30 that caused this break in behaviour. I have > tested with v2.40, to no avail. > > Would it be useful if I put together an example project and submitted it > to this mailing list? It would need a BigQuery database (or a mock, > perhaps) to see this in action - any recommendations on how I could achieve > this? > > > Thanks again > > On Thu, 23 Jun 2022 at 13:30, Jimmy Headdon <jimmy.head...@mojiworks.com> > wrote: > >> Hello everyone >> >> Thanks for your continued efforts on the project and for help with my >> previous request! I've run into an issue where upgrading to Beam v2.30 (or >> any version up to and including v2.39) causes a breaking error when I >> attempt to write an array of Map objects to Google's BigQuery: >> >> "Error while reading data, error message: JSON parsing error in row >> starting at position 0: Nested arrays not allowed." >> >> >> I'll explain my setup below, but for reference this behaviour worked >> against Beam's Java library versions 2.19 through 2.29 (I checked them all, >> and it definitely breaks from v2.30 onwards). >> >> *Pipeline Setup* >> I have a Java pipeline that reads TableRow objects from BigQuery, >> performs some fairly basic aggregation through Beam SQL and some in-memory >> transformations, and writes the results back to a different table in >> BigQuery. >> >> The initial BQ TableRow schema is a handful of STRING, INTEGER and >> BOOLEAN objects, plus a single (nullable) RECORD field. This RECORD field >> is the one that causes problems, it is a REPEATED record with simple STRING >> fields for each instance. >> >> When converting the BQ TableRow to Beam Row objects, I'm setting a row >> schema for the above BQ RECORD as follows (I've also tried a Row field, >> at the very bottom of this email, for reference): >> >> .addNullableField("experiments", >> FieldType.array(FieldType.map(FieldType.STRING, FieldType.STRING))) >> >> >> I have a custom ParDo function to convert the TableRow objects to Row >> (at the very bottom of this email, for reference). It creates a Map >> instance for every repeated record from BQ, and adds it to an ArrayList which >> is then added to the new Row through Row.withSchema(x).addValue (I've >> also tried .addArray). >> >> When writing back to BigQuery (I use >> org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toTableRow()) the >> TableRow is created but then throws the aforementioned "Nested arrays >> not allowed" error. >> >> I'm fairly confident the issue occurs in the initial conversion from >> TableRow to Row, as I can print row.toString() and the outputs differ. >> It's worth noting that printing the TableRow before conversion shows no >> difference between versions. >> >> System.out.println(row.toString()); >> >> >> Here is the output for this RECORD field with Beam v2.29: >> >> [{"name": “forename”, "type": “person”}] >> >> >> And the same input data printed with Beam v2.30 (or higher): >> >> [[{"key":"name","value”:”forename”},{“key":"type","value”:”person”}]] >> >> >> It has become a nested structure with Key and Value fields. >> >> >> Nothing obvious stands out in the v2.30 release notes, I'm thinking it >> could be an upstream library change or possibly an indirect Coder change? >> Any ideas? >> >> Thanks again, let me know if I can provide any other snippets, input >> data, etc. >> >> >> *TableRow to Row Conversion* >> >> public ArrayList<Map<String, String>> >> getRecords(ArrayList<Map<String, Object>> records) { >> >> if (records == null) { >> >> return null; >> >> } >> >> >> ArrayList<Map<String, String>> recordRows = new >> ArrayList<Map<String, String>>(); >> >> >> for (Map<String, Object> record: records) { >> >> Map<String, String> e = new HashMap<String, String>(); >> >> e.put("name", (String)experiment.getOrDefault("name", >> "")); >> >> e.put("type", (String)experiment.getOrDefault("type", >> "")); >> >> >> recordRows.add(e); >> >> } >> >> >> return recordRows; >> >> } >> >> >> *Alternate Row Schema* >> >> public static Schema recordsRowSchema = >> Schema.builder() >> .addNullableField("name", FieldType.STRING) >> .addNullableField("type", FieldType.STRING) >> .build(); >> >> .addNullableField("records", >> FieldType.array(FieldType.row(recordsRowSchema))) >> >