Hi,
Got it, seems this way is not flexable enough, but still thanks so much for your great support! Good wished! Regards && Thanks Hunk At 2022-04-02 16:34:29, "Qingsheng Ren" <renqs...@gmail.com> wrote: >Hi, > >If the schema of records is not fixed I’m afraid you have to do it in UDTF. > >Best, > >Qingsheng > >> On Apr 2, 2022, at 15:45, wang <24248...@163.com> wrote: >> >> Hi, >> >> Thanks for your quick response! >> >> And I tried the format "raw", seems it only support single physical column, >> and in our project reqiurement, there are more than one hundred columns in >> sink table. So I need combine those columns into one string in a single UDF? >> >> Thanks && Regards, >> Hunk >> >> >> >> >> >> >> >> At 2022-04-02 15:17:14, "Qingsheng Ren" <renqs...@gmail.com> wrote: >> >Hi, >> > >> >You can construct the final json string in your UDTF, and write it to Kafka >> >sink table with only one field, which is the entire json string constructed >> >in UDTF, and use raw format [1] in the sink table: >> > >> >CREATE TABLE TableSink ( >> > `final_json_string` STRING >> >) WITH ( >> > ‘connector’ = ‘kafka’, >> > ‘format’ = ‘raw’, >> > ... >> >) >> > >> >So the entire flow would be like: >> > >> >1. Kafka source table reads data >> >2. UDTF parses the ‘content’ field, and construct the final json (id, >> >content without backslash) string you need, maybe using Jackson [2] or >> >other json tools >> >3. Insert the constructed json string as the only field in sink table >> > >> >The key problem is that the schema of field “content” is not fixed, so you >> >have to complete most logics in UDTF. >> > >> >[1] >> >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/ >> >[2] https://github.com/FasterXML/jackson >> > >> >Best regards, >> > >> >Qingsheng >> > >> > >> >> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote: >> >> >> >> Hi, >> >> >> >> Thanks so much for your support! >> >> >> >> But sorry to say I'm still confused about it. No matter what the udf >> >> looks like, the first thing I need confirm is the type of 'content' in >> >> TableSink, what's the type of it should be, should I use type Row, like >> >> this? >> >> >> >> CREATE TABLE TableSink ( >> >> `id` STRING NOT NULL, >> >> `content` ROW<name STRING, age BIGINT> >> >> ) >> >> WITH ( >> >> ... >> >> ); >> >> >> >> This type is only suitable for source input {"schema": "schema_infos", >> >> "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}} >> >> >> >> But the json key name and format of 'content' in source is variable, if >> >> the source input is >> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": >> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}} >> >> >> >> I should define `content` in TableSink with type `content` ROW<color >> >> STRING, BackgroundColor STRING, Height BIGINT>, like this: >> >> >> >> CREATE TABLE TableSink ( >> >> `id` STRING NOT NULL, >> >> `content` ROW<color STRING, BackgroundColor STRING, Height >> >> BIGINT> >> >> ) >> >> WITH ( >> >> ... >> >> ); >> >> >> >> And in input json also might contains json array, like: >> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": >> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}} >> >> >> >> >> >> So is there some common type I can use which can handle all input json >> >> formats? >> >> >> >> Thanks so much!! >> >> >> >> >> >> Thanks && Regards, >> >> Hunk >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2022-04-01 17:25:59, "Qingsheng Ren" <renqs...@gmail.com >> >> > wrote: >> >> >Hi, >> >> > >> >> >I’m afraid you have to use a UDTF to parse the content and construct the >> >> >final json string manually. The key problem is that the field “content” >> >> >is actually a STRING, although it looks like a json object. Currently >> >> >the json format provided by Flink could not handle this kind of field >> >> >defined as STRING. Also considering the schema of this “content” field >> >> >is not fixed across records, Flink SQL can’t use one DDL to consume / >> >> >produce records with changing schema. >> >> > >> >> >Cheers, >> >> > >> >> >Qingsheng >> >> > >> >> >> On Mar 31, 2022, at 21:42, wang < >> >> 24248...@163.com >> >> > wrote: >> >> >> >> >> >> Hi dear engineer, >> >> >> >> >> >> Thanks so much for your precious time reading my email! >> >> >> Resently I'm working on the Flink sql (with version 1.13) in my >> >> >> project and encountered one problem about json format data, hope you >> >> >> can take a look, thanks! Below is the description of my issue. >> >> >> >> >> >> I use kafka as source and sink, I define kafka source table like this: >> >> >> >> >> >> CREATE TABLE TableSource ( >> >> >> schema STRING, >> >> >> payload ROW( >> >> >> `id` STRING, >> >> >> `content` STRING >> >> >> ) >> >> >> ) >> >> >> WITH ( >> >> >> 'connector' = 'kafka', >> >> >> 'topic' = 'topic_source', >> >> >> 'properties.bootstrap.servers' = 'localhost:9092', >> >> >> ' >> >> properties.group.id >> >> ' = 'all_gp', >> >> >> 'scan.startup.mode' = 'group-offsets', >> >> >> 'format' = 'json', >> >> >> 'json.fail-on-missing-field' = 'false', >> >> >> 'json.ignore-parse-errors' = 'true' >> >> >> ); >> >> >> >> >> >> Define the kafka sink table like this: >> >> >> >> >> >> CREATE TABLE TableSink ( >> >> >> `id` STRING NOT NULL, >> >> >> `content` STRING NOT NULL >> >> >> ) >> >> >> WITH ( >> >> >> 'connector' = 'kafka', >> >> >> 'topic' = 'topic_sink', >> >> >> 'properties.bootstrap.servers' = 'localhost:9092', >> >> >> 'format' = 'json', >> >> >> 'json.fail-on-missing-field' = 'false', >> >> >> 'json.ignore-parse-errors' = 'true' >> >> >> ); >> >> >> >> >> >> >> >> >> Then insert into TableSink with data from TableSource: >> >> >> INSERT INTO TableSink SELECT id, content FROM TableSource; >> >> >> >> >> >> Then I use "kafka-console-producer.sh" to produce data below into >> >> >> topic "topic_source" (TableSource): >> >> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": >> >> >> "{\"name\":\"Jone\",\"age\":20}"}} >> >> >> >> >> >> >> >> >> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, >> >> >> the output is: >> >> >> {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"} >> >> >> >> >> >> But what I want here is {"id":"10000","content": >> >> >> {"name":"Jone","age":20}} >> >> >> I want the the value of "content" is json object, not json string. >> >> >> >> >> >> And what's more, the format of "content" in TableSource is not fixed, >> >> >> it can be any json formated(or json array format) string, such as: >> >> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": >> >> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}} >> >> >> >> >> >> >> >> >> So my question is, how can I transform json format string(like >> >> >> "{\"name\":\"Jone\",\"age\":20}") from TableSource to json object >> >> >> (like{"name":"Jone","age":20} ). >> >> >> >> >> >> >> >> >> Thanks && Regards, >> >> >> Hunk >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>