Hi,
Thanks so much for your support! But sorry to say I'm still confused about it. No matter what the udf looks like, the first thing I need confirm is the type of 'content' in TableSink, what's the type of it should be, should I use type Row, like this? CREATE TABLE TableSink ( `id` STRING NOT NULL, `content` ROW<name STRING, age BIGINT> ) WITH ( ... ); This type is only suitable for source input {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}} But the json key name and format of 'content' in source is variable, if the source input is {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}} I should define `content` in TableSink with type `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>, like this: CREATE TABLE TableSink ( `id` STRING NOT NULL, `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT> ) WITH ( ... ); And in input json also might contains json array, like: {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}} So is there some common type I can use which can handle all input json formats? Thanks so much!! Thanks && Regards, Hunk At 2022-04-01 17:25:59, "Qingsheng Ren" <renqs...@gmail.com> wrote: >Hi, > >I’m afraid you have to use a UDTF to parse the content and construct the final >json string manually. The key problem is that the field “content” is actually >a STRING, although it looks like a json object. Currently the json format >provided by Flink could not handle this kind of field defined as STRING. Also >considering the schema of this “content” field is not fixed across records, >Flink SQL can’t use one DDL to consume / produce records with changing schema. > >Cheers, > >Qingsheng > >> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote: >> >> Hi dear engineer, >> >> Thanks so much for your precious time reading my email! >> Resently I'm working on the Flink sql (with version 1.13) in my project and >> encountered one problem about json format data, hope you can take a look, >> thanks! Below is the description of my issue. >> >> I use kafka as source and sink, I define kafka source table like this: >> >> CREATE TABLE TableSource ( >> schema STRING, >> payload ROW( >> `id` STRING, >> `content` STRING >> ) >> ) >> WITH ( >> 'connector' = 'kafka', >> 'topic' = 'topic_source', >> 'properties.bootstrap.servers' = 'localhost:9092', >> 'properties.group.id' = 'all_gp', >> 'scan.startup.mode' = 'group-offsets', >> 'format' = 'json', >> 'json.fail-on-missing-field' = 'false', >> 'json.ignore-parse-errors' = 'true' >> ); >> >> Define the kafka sink table like this: >> >> CREATE TABLE TableSink ( >> `id` STRING NOT NULL, >> `content` STRING NOT NULL >> ) >> WITH ( >> 'connector' = 'kafka', >> 'topic' = 'topic_sink', >> 'properties.bootstrap.servers' = 'localhost:9092', >> 'format' = 'json', >> 'json.fail-on-missing-field' = 'false', >> 'json.ignore-parse-errors' = 'true' >> ); >> >> >> Then insert into TableSink with data from TableSource: >> INSERT INTO TableSink SELECT id, content FROM TableSource; >> >> Then I use "kafka-console-producer.sh" to produce data below into topic >> "topic_source" (TableSource): >> {"schema": "schema_infos", "payload": {"id": "10000", "content": >> "{\"name\":\"Jone\",\"age\":20}"}} >> >> >> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the >> output is: >> {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"} >> >> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}} >> I want the the value of "content" is json object, not json string. >> >> And what's more, the format of "content" in TableSource is not fixed, it can >> be any json formated(or json array format) string, such as: >> {"schema": "schema_infos", "payload": {"id": "10000", "content": >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}} >> >> >> So my question is, how can I transform json format string(like >> "{\"name\":\"Jone\",\"age\":20}") from TableSource to json object >> (like{"name":"Jone","age":20} ). >> >> >> Thanks && Regards, >> Hunk >> >> >> >> >>