Hi,
Thanks for your quick response! And I tried the format "raw", seems it only support single physical column, and in our project reqiurement, there are more than one hundred columns in sink table. So I need combine those columns into one string in a single UDF? Thanks && Regards, Hunk At 2022-04-02 15:17:14, "Qingsheng Ren" <renqs...@gmail.com> wrote: >Hi, > >You can construct the final json string in your UDTF, and write it to Kafka >sink table with only one field, which is the entire json string constructed in >UDTF, and use raw format [1] in the sink table: > >CREATE TABLE TableSink ( > `final_json_string` STRING >) WITH ( > ‘connector’ = ‘kafka’, > ‘format’ = ‘raw’, > ... >) > >So the entire flow would be like: > >1. Kafka source table reads data >2. UDTF parses the ‘content’ field, and construct the final json (id, content >without backslash) string you need, maybe using Jackson [2] or other json tools >3. Insert the constructed json string as the only field in sink table > >The key problem is that the schema of field “content” is not fixed, so you >have to complete most logics in UDTF. > >[1] >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/ >[2] https://github.com/FasterXML/jackson > >Best regards, > >Qingsheng > > >> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote: >> >> Hi, >> >> Thanks so much for your support! >> >> But sorry to say I'm still confused about it. No matter what the udf looks >> like, the first thing I need confirm is the type of 'content' in TableSink, >> what's the type of it should be, should I use type Row, like this? >> >> CREATE TABLE TableSink ( >> `id` STRING NOT NULL, >> `content` ROW<name STRING, age BIGINT> >> ) >> WITH ( >> ... >> ); >> >> This type is only suitable for source input {"schema": "schema_infos", >> "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}} >> >> But the json key name and format of 'content' in source is variable, if the >> source input is >> {"schema": "schema_infos", "payload": {"id": "10000", "content": >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}} >> >> I should define `content` in TableSink with type `content` ROW<color STRING, >> BackgroundColor STRING, Height BIGINT>, like this: >> >> CREATE TABLE TableSink ( >> `id` STRING NOT NULL, >> `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT> >> ) >> WITH ( >> ... >> ); >> >> And in input json also might contains json array, like: >> {"schema": "schema_infos", "payload": {"id": "10000", "content": >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}} >> >> >> So is there some common type I can use which can handle all input json >> formats? >> >> Thanks so much!! >> >> >> Thanks && Regards, >> Hunk >> >> >> >> >> >> >> At 2022-04-01 17:25:59, "Qingsheng Ren" <renqs...@gmail.com >> > wrote: >> >Hi, >> > >> >I’m afraid you have to use a UDTF to parse the content and construct the >> >final json string manually. The key problem is that the field “content” is >> >actually a STRING, although it looks like a json object. Currently the json >> >format provided by Flink could not handle this kind of field defined as >> >STRING. Also considering the schema of this “content” field is not fixed >> >across records, Flink SQL can’t use one DDL to consume / produce records >> >with changing schema. >> > >> >Cheers, >> > >> >Qingsheng >> > >> >> On Mar 31, 2022, at 21:42, wang < >> 24248...@163.com >> > wrote: >> >> >> >> Hi dear engineer, >> >> >> >> Thanks so much for your precious time reading my email! >> >> Resently I'm working on the Flink sql (with version 1.13) in my project >> >> and encountered one problem about json format data, hope you can take a >> >> look, thanks! Below is the description of my issue. >> >> >> >> I use kafka as source and sink, I define kafka source table like this: >> >> >> >> CREATE TABLE TableSource ( >> >> schema STRING, >> >> payload ROW( >> >> `id` STRING, >> >> `content` STRING >> >> ) >> >> ) >> >> WITH ( >> >> 'connector' = 'kafka', >> >> 'topic' = 'topic_source', >> >> 'properties.bootstrap.servers' = 'localhost:9092', >> >> ' >> properties.group.id >> ' = 'all_gp', >> >> 'scan.startup.mode' = 'group-offsets', >> >> 'format' = 'json', >> >> 'json.fail-on-missing-field' = 'false', >> >> 'json.ignore-parse-errors' = 'true' >> >> ); >> >> >> >> Define the kafka sink table like this: >> >> >> >> CREATE TABLE TableSink ( >> >> `id` STRING NOT NULL, >> >> `content` STRING NOT NULL >> >> ) >> >> WITH ( >> >> 'connector' = 'kafka', >> >> 'topic' = 'topic_sink', >> >> 'properties.bootstrap.servers' = 'localhost:9092', >> >> 'format' = 'json', >> >> 'json.fail-on-missing-field' = 'false', >> >> 'json.ignore-parse-errors' = 'true' >> >> ); >> >> >> >> >> >> Then insert into TableSink with data from TableSource: >> >> INSERT INTO TableSink SELECT id, content FROM TableSource; >> >> >> >> Then I use "kafka-console-producer.sh" to produce data below into topic >> >> "topic_source" (TableSource): >> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": >> >> "{\"name\":\"Jone\",\"age\":20}"}} >> >> >> >> >> >> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the >> >> output is: >> >> {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"} >> >> >> >> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}} >> >> I want the the value of "content" is json object, not json string. >> >> >> >> And what's more, the format of "content" in TableSource is not fixed, it >> >> can be any json formated(or json array format) string, such as: >> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": >> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}} >> >> >> >> >> >> So my question is, how can I transform json format string(like >> >> "{\"name\":\"Jone\",\"age\":20}") from TableSource to json object >> >> (like{"name":"Jone","age":20} ). >> >> >> >> >> >> Thanks && Regards, >> >> Hunk >> >> >> >> >> >> >> >> >> >> >>