because the format of "content" in TableSource is not fixed,
It may be a good way to deal with it through UDF -- Best regards, Mang Zhang At 2022-03-31 21:42:50, "wang" <24248...@163.com> wrote: >Hi dear engineer, > > >Thanks so much for your precious time reading my email! >Resently I'm working on the Flink sql (with version 1.13) in my project and >encountered one problem about json format data, hope you can take a look, >thanks! Below is the description of my issue. > > >I use kafka as source and sink, I define kafka source table like this: > > > CREATE TABLE TableSource ( > schema STRING, > payload ROW( > `id` STRING, > `content` STRING > ) > ) > WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_source', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'all_gp', > 'scan.startup.mode' = 'group-offsets', > 'format' = 'json', > 'json.fail-on-missing-field' = 'false', > 'json.ignore-parse-errors' = 'true' > ); > > >Define the kafka sink table like this: > > > CREATE TABLE TableSink ( > `id` STRING NOT NULL, > `content` STRING NOT NULL > ) > WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_sink', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json', > 'json.fail-on-missing-field' = 'false', > 'json.ignore-parse-errors' = 'true' > ); > > > > >Then insert into TableSink with data from TableSource: > INSERT INTO TableSink SELECT id, content FROM TableSource; > > >Then I use "kafka-console-producer.sh" to produce data below into topic >"topic_source" (TableSource): > {"schema": "schema_infos", "payload": {"id": "10000", "content": > "{\"name\":\"Jone\",\"age\":20}"}} > > > > >Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the >output is: > {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"} > > >But what I want here is {"id":"10000","content": {"name":"Jone","age":20}} >I want the the value of "content" is json object, not json string. > > >And what's more, the format of "content" in TableSource is not fixed, it can >be any json formated(or json array format) string, such as: >{"schema": "schema_infos", "payload": {"id": "10000", "content": >"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}} > > > > >So my question is, how can I transform json format string(like >"{\"name\":\"Jone\",\"age\":20}") from TableSource to json object >(like{"name":"Jone","age":20} ). > > > > >Thanks && Regards, >Hunk >