Thank you Ran for the explanation. The column DEFAULT is a reasonable feature and can also help in other cases. I’m fine with adding this feature. Do you want to prepare a FLIP for it?
Best, Jark > 2022年8月29日 15:02,Ran Tao <chucheng...@gmail.com> 写道: > > Hi Jack. Timo summed it up very well. In fact, my problem is that the > current flink table metadata is fixed and cannot be compatible with the > connector's changes in metadata columns. > A metadata column that did not exist in the past, does exist at some point > in the future, and vice versa. > There is forward and backward compatibility here. > > Jark Wu <imj...@gmail.com> 于2022年8月26日周五 16:28写道: > >> Hi Ran, >> >> If the metadata is from the message properties, then you can manually cast >> it to your preferred types, >> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS >> TIMESTAMP)`. >> >> If the metadata is not from the message properties, how does the connector >> know which field to convert from? >> Shouldn’t the connector be modified to support this new metadata column? >> >> Best, >> Jark >> >> >> >>> 2022年8月26日 15:30,Ran Tao <chucheng...@gmail.com> 写道: >>> >>> Hi, TiMo. I think using one map column in the debezium format you >>> illustrated above can't cover the discussed scenario. >>> It's not the same thing. >>> >>> Here is a debezium format example from flink docs: [1] >>> >>> ``` >>> CREATE TABLE KafkaTable ( >>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' >> VIRTUAL, >>> origin_properties MAP<STRING, STRING> METADATA FROM >>> 'value.source.properties' VIRTUAL, >>> user_id BIGINT, >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'value.format' = 'debezium-json' >>> ... >>> ); >>> ``` >>> >>> *the `origin_properties` is a column used for properties. So we define it >>> at MAP *(just like you respond). But the other metadata columns have >> their >>> own data types. >>> e.g. `origin_ts` is TIMESTAMP. We can not flatmap all metadata columns >>> within one MAP<STRING, STRING> column. it's not a good idea. >>> >>> My suggestion is that if kafka above *add some new metadatas*(just for >>> example, kafka maybe stable, but a certain connector or middleware might >> be >>> developing, so its metadatas could be added or changed) >>> e.g. at some time, kafka added a `host_name` metadata (indicate the >> address >>> of message broker). >>> >>> We can define sql like this: >>> ``` >>> CREATE TABLE KafkaTable ( >>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' >> VIRTUAL, >>> host_name STRING METADATA VIRTUAL DYNAMIC, >>> origin_properties MAP<STRING, STRING> METADATA FROM >>> 'value.source.properties' VIRTUAL, >>> user_id BIGINT, >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'value.format' = 'debezium-json' >>> ... >>> ); >>> ``` >>> Then users can use `host_name` this metadata, because it's a DYNAMIC >>> metacolumn, flink dont't throw exception although `host_name` >>> not belongs to kafka before, and the developers don't need to modify or >>> rebuild flink source code and publish flink to online environment (it >> comes >>> at a high cost). >>> >>> Considering the return value: >>> kafka before (no this metadata): null >>> kafka now (added this metadata already): the concrete value >>> >>> Same user sql works well in the past and now even in the future rather >> than >>> check and deny these new metadata columns or modify connector >>> implementation frequently to support it. >>> And it's an option to configure by using 'DYNAMIC' at the metadata >>> column(or other better implementations). >>> >>> [1] >>> >> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/ >>> >>> Timo Walther <twal...@apache.org> 于2022年8月25日周四 21:07写道: >>> >>>> Hi Ran, >>>> >>>> what would be the data type of this dynamic metadata column? The planner >>>> and many parts of the stack will require a data type. >>>> >>>> Personally, I feel connector developers can already have the same >>>> functionality by declaring a metadata column as `MAP<STRING, STRING>`. >>>> This is what we expose already as `debezium.source.properties`. Whatever >>>> Debezium adds will be available through this property and can be >>>> accessed via `SELECT col['my-new-property'] FROM x` including being NULL >>>> be default if not present. >>>> >>>> Regards, >>>> Timo >>>> >>>> >>>> On 25.08.22 14:04, Ran Tao wrote: >>>>> ``` >>>>> create table test_source( >>>>> __test_metadata__ varchar METADATA, >>>>> f0 varchar, >>>>> f1 varchar, >>>>> f2 bigint, >>>>> ts as CURRENT_TIMESTAMP >>>>> ) with( >>>>> 'connector'='test', >>>>> ... >>>>> ) >>>>> ``` >>>>> >>>>> If we not pre define `__test_metadata__` as meta keys by implementing >>>>> listReadableMetadata, run the above sql, it will cause exception like >>>> this: >>>>> >>>>> org.apache.flink.table.api.ValidationException: Invalid metadata key >>>>> '__test_metadata__' in column '__test_metadata__' of table >>>>> 'default_catalog.default_database.test_source'. The DynamicTableSource >>>>> class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource' >>>>> supports the following metadata keys for reading: >>>>> xxx, yyy >>>>> >>>>> at >>>>> >>>> >> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409) >>>>> >>>>> Because the current flink metadata column must exist in results >> returned >>>> by >>>>> `listReadableMetadata`. But when a certain connector adds some >>>> metadatas, >>>>> we can not use it directly unless we modify this connector code and >>>> support >>>>> it. In some situations, It can be intolerable. Can we support 'DYNAMIC >>>>> MetadataColumn'? Its basic mechanism is not to check a column with >>>>> existing metadatas and users can define it dynamically. If a certain >>>>> connector without this metadata, the column value will return null >>>>> otherwise return it's concrete value. It has great benefits in some >>>>> scenarios. >>>>> >>>>> Looking forward to your opinions. >>>>> >>>>> >>>> >>>> >>> >>> -- >>> Best Regards, >>> Ran Tao >>> https://github.com/chucheng92 >> >> > > -- > Best Regards, > Ran Tao > https://github.com/chucheng92