Yeah I think a minimal example would be really helpful. Then at the very
least we should be able to bisect to identify the breaking change. For
creating a test BigQuery database you might look at using the TestBigQuery
rule [1]. There are several usage examples in the Beam repo [2].

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
[2]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogBigQueryIT.java

On Sun, Jul 17, 2022 at 4:00 AM Jimmy Headdon <jimmy.head...@mojiworks.com>
wrote:

> Hello - I've had no joy investigating this further, nor finding the
> specific change in v2.30 that caused this break in behaviour.  I have
> tested with v2.40, to no avail.
>
> Would it be useful if I put together an example project and submitted it
> to this mailing list?  It would need a BigQuery database (or a mock,
> perhaps) to see this in action - any recommendations on how I could achieve
> this?
>
>
> Thanks again
>
> On Thu, 23 Jun 2022 at 13:30, Jimmy Headdon <jimmy.head...@mojiworks.com>
> wrote:
>
>> Hello everyone
>>
>> Thanks for your continued efforts on the project and for help with my
>> previous request!  I've run into an issue where upgrading to Beam v2.30 (or
>> any version up to and including v2.39) causes a breaking error when I
>> attempt to write an array of Map objects to Google's BigQuery:
>>
>> "Error while reading data, error message: JSON parsing error in row
>> starting at position 0: Nested arrays not allowed."
>>
>>
>> I'll explain my setup below, but for reference this behaviour worked
>> against Beam's Java library versions 2.19 through 2.29 (I checked them all,
>> and it definitely breaks from v2.30 onwards).
>>
>> *Pipeline Setup*
>> I have a Java pipeline that reads TableRow objects from BigQuery,
>> performs some fairly basic aggregation through Beam SQL and some in-memory
>> transformations, and writes the results back to a different table in
>> BigQuery.
>>
>> The initial BQ TableRow schema is a handful of STRING, INTEGER and
>> BOOLEAN objects, plus a single (nullable) RECORD field.  This RECORD field
>> is the one that causes problems, it is a REPEATED record with simple STRING
>> fields for each instance.
>>
>> When converting the BQ TableRow to Beam Row objects, I'm setting a row
>> schema for the above BQ RECORD as follows (I've also tried a Row field,
>> at the very bottom of this email, for reference):
>>
>> .addNullableField("experiments",
>> FieldType.array(FieldType.map(FieldType.STRING, FieldType.STRING)))
>>
>>
>> I have a custom ParDo function to convert the TableRow objects to Row
>> (at the very bottom of this email, for reference).  It creates a Map
>> instance for every repeated record from BQ, and adds it to an ArrayList which
>> is then added to the new Row through Row.withSchema(x).addValue (I've
>> also tried .addArray).
>>
>> When writing back to BigQuery (I use
>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toTableRow()) the
>> TableRow is created but then throws the aforementioned "Nested arrays
>> not allowed" error.
>>
>> I'm fairly confident the issue occurs in the initial conversion from
>> TableRow to Row, as I can print row.toString() and the outputs differ.
>> It's worth noting that printing the TableRow before conversion shows no
>> difference between versions.
>>
>> System.out.println(row.toString());
>>
>>
>> Here is the output for this RECORD field with Beam v2.29:
>>
>> [{"name": “forename”, "type": “person”}]
>>
>>
>> And the same input data printed with Beam v2.30 (or higher):
>>
>> [[{"key":"name","value”:”forename”},{“key":"type","value”:”person”}]]
>>
>>
>> It has become a nested structure with Key and Value fields.
>>
>>
>> Nothing obvious stands out in the v2.30 release notes, I'm thinking it
>> could be an upstream library change or possibly an indirect Coder change?
>> Any ideas?
>>
>> Thanks again, let me know if I can provide any other snippets, input
>> data, etc.
>>
>>
>> *TableRow to Row Conversion*
>>
>>         public ArrayList<Map<String, String>>
>> getRecords(ArrayList<Map<String, Object>> records) {
>>
>>             if (records == null) {
>>
>>                 return null;
>>
>>             }
>>
>>
>>             ArrayList<Map<String, String>> recordRows = new
>> ArrayList<Map<String, String>>();
>>
>>
>>             for (Map<String, Object> record: records) {
>>
>>                 Map<String, String> e = new HashMap<String, String>();
>>
>>                 e.put("name", (String)experiment.getOrDefault("name",
>> ""));
>>
>>                 e.put("type", (String)experiment.getOrDefault("type",
>> ""));
>>
>>
>>                 recordRows.add(e);
>>
>>             }
>>
>>
>>             return recordRows;
>>
>>         }
>>
>>
>> *Alternate Row Schema*
>>
>>     public static Schema recordsRowSchema =
>>         Schema.builder()
>>               .addNullableField("name", FieldType.STRING)
>>               .addNullableField("type", FieldType.STRING)
>>               .build();
>>
>>     .addNullableField("records",
>> FieldType.array(FieldType.row(recordsRowSchema)))
>>
>

Reply via email to