because  the format of "content" in TableSource is not fixed,

It may be a good way to deal with it through UDF













--

Best regards,
Mang Zhang





At 2022-03-31 21:42:50, "wang" <24248...@163.com> wrote:
>Hi dear engineer,
>
>
>Thanks so much for your precious time reading my email!
>Resently I'm working on the Flink sql (with version 1.13) in my project and 
>encountered one problem about json format data, hope you can take a look, 
>thanks! Below is the description of my issue.
>
>
>I use kafka as source and sink, I define kafka source table like this:
>
>
>     CREATE TABLE TableSource (
>          schema STRING,
>          payload ROW(
>              `id` STRING,
>              `content` STRING
>         )
>     )
>     WITH (
>         'connector' = 'kafka',
>         'topic' = 'topic_source',
>         'properties.bootstrap.servers' = 'localhost:9092',
>         'properties.group.id' = 'all_gp',
>         'scan.startup.mode' = 'group-offsets',
>         'format' = 'json',
>         'json.fail-on-missing-field' = 'false',
>         'json.ignore-parse-errors' = 'true'
>     );
>
>
>Define the kafka sink table like this:
>
>
>     CREATE TABLE TableSink (
>          `id` STRING NOT NULL,
>          `content` STRING NOT NULL
>     )
>     WITH (
>         'connector' = 'kafka',
>         'topic' = 'topic_sink',
>         'properties.bootstrap.servers' = 'localhost:9092',
>         'format' = 'json',
>         'json.fail-on-missing-field' = 'false',
>         'json.ignore-parse-errors' = 'true'
>    );
>
>
>
>
>Then insert into TableSink with data from TableSource:
>    INSERT INTO TableSink SELECT id, content FROM TableSource;
>
>
>Then I use "kafka-console-producer.sh" to produce data below into topic 
>"topic_source" (TableSource):
>    {"schema": "schema_infos", "payload": {"id": "10000", "content": 
> "{\"name\":\"Jone\",\"age\":20}"}}
>
>
>
>
>Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
>output is:
>    {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
>
>
>But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
>I want the the value of "content" is json object, not json string.
>
>
>And what's more, the format of "content" in TableSource is not fixed, it can 
>be any json formated(or json array format) string, such as:
>{"schema": "schema_infos", "payload": {"id": "10000", "content": 
>"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>
>
>
>
>So my question is, how can I transform json format string(like 
>"{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
>(like{"name":"Jone","age":20} ).
>
>
>
>
>Thanks && Regards,
>Hunk
>

回复