wuchong commented on a change in pull request #13081: URL: https://github.com/apache/flink/pull/13081#discussion_r467729328
########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java ########## @@ -130,6 +133,39 @@ public RowData deserialize(byte[] message) throws IOException { } } + @Override Review comment: Remove the implementation of `deserialize(byte[] message)`, because it is never used now. ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java ########## @@ -130,6 +133,39 @@ public RowData deserialize(byte[] message) throws IOException { } } + @Override + public void deserialize(byte[] message, Collector<RowData> out) throws IOException { + try { + final JsonNode root = objectMapper.readTree(message); + + if (root.isArray()) { + List<RowData> rowDataList = new ArrayList<>(); + ArrayNode arrayNode = (ArrayNode) root; + for (int i = 0; i < arrayNode.size(); ++i) { + RowData result = (RowData) runtimeConverter.convert(arrayNode.get(i)); + if (result != null) { + rowDataList.add(result); + } + } + + // collect final results in a separate for-loop in case of parse errors. Review comment: I think the parse errors can still be catched if we collect each parsed row eagerly. ########## File path: docs/dev/table/connectors/formats/json.md ########## @@ -212,6 +212,53 @@ The following table lists the type mapping from Flink type to JSON type. </tbody> </table> +How is json string converted to Flink SQL Row Review comment: Could you add this under a "Features" section like other pages? Besides, we can use "Allow top-level JSON Arrays" as the sub-section title. ########## File path: docs/dev/table/connectors/formats/json.md ########## @@ -212,6 +212,53 @@ The following table lists the type mapping from Flink type to JSON type. </tbody> </table> +How is json string converted to Flink SQL Row +---------------- +Usually, we assume the outer most of json string is a json object. Then the json object is converted to one SQL row. + Review comment: remove emtpy line ########## File path: docs/dev/table/connectors/formats/json.md ########## @@ -212,6 +212,53 @@ The following table lists the type mapping from Flink type to JSON type. </tbody> </table> +How is json string converted to Flink SQL Row +---------------- +Usually, we assume the outer most of json string is a json object. Then the json object is converted to one SQL row. + + +There are some cases that, the outer most of json string is a json array, and we want to explode the array to +multiple records, each one of the array is a json object which is converted to one row. Flink JSON Format supports +read such data implicitly. + +For example, for the following SQL DDL: +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE user_behavior ( + col1 BIGINT, + col2 DOUBLE, + col3 VARCHAR, + col4 BOOLEAN +) WITH ( + 'format' = 'json', + ... +) +{% endhighlight %} +</div> +</div> +and with following json string: +<div class="codetabs" markdown="1"> +<div data-lang="JSON" markdown="1"> +{% highlight json %} +[ + { + "col1": 123, + "col2": 12.34, + "col3": "str1", + "col4": true + }, + { + "col1": 456, + "col2": 45.67, + "col3": "str2", + "col4": false + }, +] +{% endhighlight %} +</div> +</div> +Flink JSON Format will produce 2 records. Review comment: ```suggestion Flink JSON Format will produce 2 rows `(123, "a")` and `(456, "b")` with both of following two json string. The top-level is JSON Array: {% highlight json %} [{"col1": 123, "col2": "a"}, {"col1": 456, "col2": "b"}] {% endhighlight %} The top-level is JSON Object: {% highlight json %} {"col1": 123, "col2": "a"} {"col1": 456, "col2": "b"} {% endhighlight %} ``` We can simplify the example with 2 columns, but with the result example. ########## File path: docs/dev/table/connectors/formats/json.md ########## @@ -212,6 +212,53 @@ The following table lists the type mapping from Flink type to JSON type. </tbody> </table> +How is json string converted to Flink SQL Row +---------------- +Usually, we assume the outer most of json string is a json object. Then the json object is converted to one SQL row. Review comment: outer most -> top-level ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org