Flink SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。
比如:
SourceT: (
uuid String,
body_data ARRAY<ROW<field1 String, field2 String>>
)
SinkT (
result ARRAY<ROW< uuid string, body_data String, body_data.fild1
String, body_data.fild2 String>>
)
Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as
body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid,
null,body_data[2]. field, body_data[2]. field2)] as result
希望对你有帮助
> 2023年11月22日 20:54,casel.chen <[email protected]> 写道:
>
> 输入:
>
> {
>
> "uuid":"XXXX",
>
> "body_data":
> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
>
> }
>
>
>
>
> 输出:
>
> [
>
> {
>
> "uuid": "XXXX",
>
> "body_data: null,
>
> "body_data.fild1": "123”,
>
> "body_data.fild2": "234"
>
> },
>
> {
>
> "uuid": "XXXX",
>
> "body_data": null,
>
> "body_data.fild1": "abc",
>
> "body_data.fild2": "cdf"
>
> }
>
> ]
>
>
>
>
> 当格式错误时
>
>
>
>
> 输入:
>
> {
>
> "uuid": "XXXX”,
>
> "body_data": "abc"
>
> }
>
> 输出:
>
> {
>
> "uuid": "XXXX",
>
> "body_data": "abc",
>
> "body_data.fild1": null,
>
> "body_data.fild2": null
>
> }