Thank you Ran for the explanation.

The column DEFAULT is a reasonable feature and can also help in other cases. 
I’m fine with adding this feature. 
Do you want to prepare a FLIP for it?

Best,
Jark

> 2022年8月29日 15:02,Ran Tao <chucheng...@gmail.com> 写道:
> 
> Hi Jack. Timo summed it up very well. In fact, my problem is that the
> current flink table metadata is fixed and cannot be compatible with the
> connector's changes in metadata columns.
> A metadata column that did not exist in the past, does exist at some point
> in the future, and vice versa.
> There is forward and backward compatibility here.
> 
> Jark Wu <imj...@gmail.com> 于2022年8月26日周五 16:28写道:
> 
>> Hi Ran,
>> 
>> If the metadata is from the message properties, then you can manually cast
>> it to your preferred types,
>> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS
>> TIMESTAMP)`.
>> 
>> If the metadata is not from the message properties, how does the connector
>> know which field to convert from?
>> Shouldn’t the connector be modified to support this new metadata column?
>> 
>> Best,
>> Jark
>> 
>> 
>> 
>>> 2022年8月26日 15:30,Ran Tao <chucheng...@gmail.com> 写道:
>>> 
>>> Hi, TiMo. I think using one map column in the debezium format you
>>> illustrated above can't cover the discussed scenario.
>>> It's not the same thing.
>>> 
>>> Here is a debezium format example from flink docs: [1]
>>> 
>>> ```
>>> CREATE TABLE KafkaTable (
>>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
>> VIRTUAL,
>>> origin_properties MAP<STRING, STRING> METADATA FROM
>>> 'value.source.properties' VIRTUAL,
>>> user_id BIGINT,
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'value.format' = 'debezium-json'
>>> ...
>>> );
>>> ```
>>> 
>>> *the `origin_properties` is a column used for properties. So we define it
>>> at MAP *(just like you respond). But the other metadata columns have
>> their
>>> own data types.
>>> e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata columns
>>> within one MAP<STRING, STRING> column. it's not a good idea.
>>> 
>>> My suggestion is that if kafka above *add some new metadatas*(just for
>>> example, kafka maybe stable, but a certain connector or middleware might
>> be
>>> developing, so its metadatas could be added or changed)
>>> e.g. at some time, kafka added a `host_name` metadata (indicate the
>> address
>>> of message broker).
>>> 
>>> We can define sql like this:
>>> ```
>>> CREATE TABLE KafkaTable (
>>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
>> VIRTUAL,
>>> host_name STRING METADATA VIRTUAL DYNAMIC,
>>> origin_properties MAP<STRING, STRING> METADATA FROM
>>> 'value.source.properties' VIRTUAL,
>>> user_id BIGINT,
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'value.format' = 'debezium-json'
>>> ...
>>> );
>>> ```
>>> Then users can use `host_name` this metadata, because it's a DYNAMIC
>>> metacolumn, flink dont't throw exception although `host_name`
>>> not belongs to kafka before, and the developers don't need to modify or
>>> rebuild flink source code and publish flink to online environment (it
>> comes
>>> at a high cost).
>>> 
>>> Considering the return value:
>>> kafka before (no this metadata): null
>>> kafka now (added this metadata already): the concrete value
>>> 
>>> Same user sql works well in the past and now even in the future rather
>> than
>>> check and deny these new metadata columns or modify connector
>>> implementation frequently to support it.
>>> And it's an option to configure by using 'DYNAMIC' at the metadata
>>> column(or other better implementations).
>>> 
>>> [1]
>>> 
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/
>>> 
>>> Timo Walther <twal...@apache.org> 于2022年8月25日周四 21:07写道:
>>> 
>>>> Hi Ran,
>>>> 
>>>> what would be the data type of this dynamic metadata column? The planner
>>>> and many parts of the stack will require a data type.
>>>> 
>>>> Personally, I feel connector developers can already have the same
>>>> functionality by declaring a metadata column as `MAP<STRING, STRING>`.
>>>> This is what we expose already as `debezium.source.properties`. Whatever
>>>> Debezium adds will be available through this property and can be
>>>> accessed via `SELECT col['my-new-property'] FROM x` including being NULL
>>>> be default if not present.
>>>> 
>>>> Regards,
>>>> Timo
>>>> 
>>>> 
>>>> On 25.08.22 14:04, Ran Tao wrote:
>>>>> ```
>>>>> create table test_source(
>>>>> __test_metadata__ varchar METADATA,
>>>>> f0 varchar,
>>>>> f1 varchar,
>>>>> f2 bigint,
>>>>> ts as CURRENT_TIMESTAMP
>>>>> ) with(
>>>>> 'connector'='test',
>>>>>  ...
>>>>> )
>>>>> ```
>>>>> 
>>>>> If we not pre define `__test_metadata__` as meta keys by implementing
>>>>> listReadableMetadata, run the above sql, it will cause exception like
>>>> this:
>>>>> 
>>>>> org.apache.flink.table.api.ValidationException: Invalid metadata key
>>>>> '__test_metadata__' in column '__test_metadata__' of table
>>>>> 'default_catalog.default_database.test_source'. The DynamicTableSource
>>>>> class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
>>>>> supports the following metadata keys for reading:
>>>>> xxx, yyy
>>>>> 
>>>>> at
>>>>> 
>>>> 
>> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
>>>>> 
>>>>> Because the current flink metadata column must exist in results
>> returned
>>>> by
>>>>> `listReadableMetadata`.  But when a certain connector adds some
>>>> metadatas,
>>>>> we can not use it directly unless we modify this connector code and
>>>> support
>>>>> it. In some situations, It can be intolerable. Can we support 'DYNAMIC
>>>>> MetadataColumn'?  Its basic mechanism is not to check a column with
>>>>> existing metadatas and users can define it dynamically. If a certain
>>>>> connector without this metadata, the column value will return null
>>>>> otherwise return it's concrete value. It has great benefits in some
>>>>> scenarios.
>>>>> 
>>>>> Looking forward to your opinions.
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> --
>>> Best Regards,
>>> Ran Tao
>>> https://github.com/chucheng92
>> 
>> 
> 
> -- 
> Best Regards,
> Ran Tao
> https://github.com/chucheng92

Reply via email to