Hi,
Thanks so much for your support!
But sorry to say I'm still confused about it. No matter what the udf looks
like, the first thing I need confirm is the type of 'content' in TableSink,
what's the type of it should be, should I use type Row, like this?
CREATE TABLE TableSink (
`id` STRING NOT NULL,
`content` ROW<name STRING, age BIGINT>
)
WITH (
...
);
This type is only suitable for source input {"schema": "schema_infos",
"payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
But the json key name and format of 'content' in source is variable, if the
source input is
{"schema": "schema_infos", "payload": {"id": "10000", "content":
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
I should define `content` in TableSink with type `content` ROW<color STRING,
BackgroundColor STRING, Height BIGINT>, like this:
CREATE TABLE TableSink (
`id` STRING NOT NULL,
`content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>
)
WITH (
...
);
And in input json also might contains json array, like:
{"schema": "schema_infos", "payload": {"id": "10000", "content":
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
So is there some common type I can use which can handle all input json formats?
Thanks so much!!
Thanks && Regards,
Hunk
At 2022-04-01 17:25:59, "Qingsheng Ren" <[email protected]> wrote:
>Hi,
>
>I’m afraid you have to use a UDTF to parse the content and construct the final
>json string manually. The key problem is that the field “content” is actually
>a STRING, although it looks like a json object. Currently the json format
>provided by Flink could not handle this kind of field defined as STRING. Also
>considering the schema of this “content” field is not fixed across records,
>Flink SQL can’t use one DDL to consume / produce records with changing schema.
>
>Cheers,
>
>Qingsheng
>
>> On Mar 31, 2022, at 21:42, wang <[email protected]> wrote:
>>
>> Hi dear engineer,
>>
>> Thanks so much for your precious time reading my email!
>> Resently I'm working on the Flink sql (with version 1.13) in my project and
>> encountered one problem about json format data, hope you can take a look,
>> thanks! Below is the description of my issue.
>>
>> I use kafka as source and sink, I define kafka source table like this:
>>
>> CREATE TABLE TableSource (
>> schema STRING,
>> payload ROW(
>> `id` STRING,
>> `content` STRING
>> )
>> )
>> WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'topic_source',
>> 'properties.bootstrap.servers' = 'localhost:9092',
>> 'properties.group.id' = 'all_gp',
>> 'scan.startup.mode' = 'group-offsets',
>> 'format' = 'json',
>> 'json.fail-on-missing-field' = 'false',
>> 'json.ignore-parse-errors' = 'true'
>> );
>>
>> Define the kafka sink table like this:
>>
>> CREATE TABLE TableSink (
>> `id` STRING NOT NULL,
>> `content` STRING NOT NULL
>> )
>> WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'topic_sink',
>> 'properties.bootstrap.servers' = 'localhost:9092',
>> 'format' = 'json',
>> 'json.fail-on-missing-field' = 'false',
>> 'json.ignore-parse-errors' = 'true'
>> );
>>
>>
>> Then insert into TableSink with data from TableSource:
>> INSERT INTO TableSink SELECT id, content FROM TableSource;
>>
>> Then I use "kafka-console-producer.sh" to produce data below into topic
>> "topic_source" (TableSource):
>> {"schema": "schema_infos", "payload": {"id": "10000", "content":
>> "{\"name\":\"Jone\",\"age\":20}"}}
>>
>>
>> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the
>> output is:
>> {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
>>
>> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
>> I want the the value of "content" is json object, not json string.
>>
>> And what's more, the format of "content" in TableSource is not fixed, it can
>> be any json formated(or json array format) string, such as:
>> {"schema": "schema_infos", "payload": {"id": "10000", "content":
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>>
>>
>> So my question is, how can I transform json format string(like
>> "{\"name\":\"Jone\",\"age\":20}") from TableSource to json object
>> (like{"name":"Jone","age":20} ).
>>
>>
>> Thanks && Regards,
>> Hunk
>>
>>
>>
>>
>>