Hello everyone

Thanks for your continued efforts on the project and for help with my
previous request!  I've run into an issue where upgrading to Beam v2.30 (or
any version up to and including v2.39) causes a breaking error when I
attempt to write an array of Map objects to Google's BigQuery:

"Error while reading data, error message: JSON parsing error in row
starting at position 0: Nested arrays not allowed."


I'll explain my setup below, but for reference this behaviour worked
against Beam's Java library versions 2.19 through 2.29 (I checked them all,
and it definitely breaks from v2.30 onwards).

*Pipeline Setup*
I have a Java pipeline that reads TableRow objects from BigQuery, performs
some fairly basic aggregation through Beam SQL and some in-memory
transformations, and writes the results back to a different table in
BigQuery.

The initial BQ TableRow schema is a handful of STRING, INTEGER and BOOLEAN
objects, plus a single (nullable) RECORD field.  This RECORD field is the
one that causes problems, it is a REPEATED record with simple STRING fields
for each instance.

When converting the BQ TableRow to Beam Row objects, I'm setting a row
schema for the above BQ RECORD as follows (I've also tried a Row field, at
the very bottom of this email, for reference):

.addNullableField("experiments",
FieldType.array(FieldType.map(FieldType.STRING, FieldType.STRING)))


I have a custom ParDo function to convert the TableRow objects to Row (at
the very bottom of this email, for reference).  It creates a Map instance
for every repeated record from BQ, and adds it to an ArrayList which is
then added to the new Row through Row.withSchema(x).addValue (I've also
tried .addArray).

When writing back to BigQuery (I use
org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toTableRow()) the
TableRow is created but then throws the aforementioned "Nested arrays not
allowed" error.

I'm fairly confident the issue occurs in the initial conversion from
TableRow to Row, as I can print row.toString() and the outputs differ.
It's worth noting that printing the TableRow before conversion shows no
difference between versions.

System.out.println(row.toString());


Here is the output for this RECORD field with Beam v2.29:

[{"name": “forename”, "type": “person”}]


And the same input data printed with Beam v2.30 (or higher):

[[{"key":"name","value”:”forename”},{“key":"type","value”:”person”}]]


It has become a nested structure with Key and Value fields.


Nothing obvious stands out in the v2.30 release notes, I'm thinking it
could be an upstream library change or possibly an indirect Coder change?
Any ideas?

Thanks again, let me know if I can provide any other snippets, input data,
etc.


*TableRow to Row Conversion*

        public ArrayList<Map<String, String>>
getRecords(ArrayList<Map<String, Object>> records) {

            if (records == null) {

                return null;

            }


            ArrayList<Map<String, String>> recordRows = new
ArrayList<Map<String, String>>();


            for (Map<String, Object> record: records) {

                Map<String, String> e = new HashMap<String, String>();

                e.put("name", (String)experiment.getOrDefault("name", ""));

                e.put("type", (String)experiment.getOrDefault("type", ""));


                recordRows.add(e);

            }


            return recordRows;

        }


*Alternate Row Schema*

    public static Schema recordsRowSchema =
        Schema.builder()
              .addNullableField("name", FieldType.STRING)
              .addNullableField("type", FieldType.STRING)
              .build();

    .addNullableField("records",
FieldType.array(FieldType.row(recordsRowSchema)))

Reply via email to