Matthias,

You can use a CROSS JOIN UNNEST, as mentioned very briefly in the docs [1].

Something like this should work:

SELECT
  id, customerid, productid, quantity, ...
FROM
  orders
CROSS JOIN UNNEST(entries) AS items (productid, quantity, unit_price,
discount);

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#array-expansion

Regards,
David

On Sun, Feb 20, 2022 at 2:25 AM Matthias Broecheler <matth...@dataeng.ai>
wrote:

> Hey Flinksters,
>
> I'm reading a nested JSON object into a table and would like to access the
> nested rows inside an array. Is there a way to flatten them so that I get a
> table with the nested rows?
>
> So far, I've only been able to figure out how to access a specific element
> inside the array using the "at" method but I'm trying to flatten the nested
> rows into a table and the arrays can have variable length. Below is a code
> snippet of what I have thus far but notice how I'm only accessing the first
> element in each array.
>
> How do you do this in Flink? Apologies if this is obvious - I wasn't able
> to find an example or documentation and would appreciate any help.
>
> Thank you,
> Matthias
>
> ---------------
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>
>
> TableDescriptor jsonTable = TableDescriptor.forConnector("filesystem")
>         .schema(Schema.newBuilder()
>                 .column("id", DataTypes.INT())
>                 .column("customerid", DataTypes.INT())
>                 .column("time", DataTypes.INT())
>                 .column("entries", DataTypes.ARRAY(DataTypes.ROW(
>                         DataTypes.FIELD("productid", DataTypes.INT()),
>                         DataTypes.FIELD("quantity", DataTypes.INT()),
>                         DataTypes.FIELD("unit_price", DataTypes.DECIMAL(9,3)),
>                         DataTypes.FIELD("discount", DataTypes.DECIMAL(9,3))
>                         )))
>                 .build())
>         .option("path", C360Test.RETAIL_DATA_DIR.toAbsolutePath() + 
> "/orders.json")
>         .format("json")
>         .build();
>
>
>
> tEnv.createTable("Orders",jsonTable);
> Table orders = tEnv.from("Orders");
>
> Table flattenEntries = 
> orders.select($("entries").at(1).get("quantity").sum().as("totalquant"));
>
>

Reply via email to