Hi Jack. Timo summed it up very well. In fact, my problem is that the current flink table metadata is fixed and cannot be compatible with the connector's changes in metadata columns. A metadata column that did not exist in the past, does exist at some point in the future, and vice versa. There is forward and backward compatibility here.
Jark Wu <imj...@gmail.com> 于2022年8月26日周五 16:28写道: > Hi Ran, > > If the metadata is from the message properties, then you can manually cast > it to your preferred types, > such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS > TIMESTAMP)`. > > If the metadata is not from the message properties, how does the connector > know which field to convert from? > Shouldn’t the connector be modified to support this new metadata column? > > Best, > Jark > > > > > 2022年8月26日 15:30,Ran Tao <chucheng...@gmail.com> 写道: > > > > Hi, TiMo. I think using one map column in the debezium format you > > illustrated above can't cover the discussed scenario. > > It's not the same thing. > > > > Here is a debezium format example from flink docs: [1] > > > > ``` > > CREATE TABLE KafkaTable ( > > origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' > VIRTUAL, > > origin_properties MAP<STRING, STRING> METADATA FROM > > 'value.source.properties' VIRTUAL, > > user_id BIGINT, > > ) WITH ( > > 'connector' = 'kafka', > > 'value.format' = 'debezium-json' > > ... > > ); > > ``` > > > > *the `origin_properties` is a column used for properties. So we define it > > at MAP *(just like you respond). But the other metadata columns have > their > > own data types. > > e.g. `origin_ts` is TIMESTAMP. We can not flatmap all metadata columns > > within one MAP<STRING, STRING> column. it's not a good idea. > > > > My suggestion is that if kafka above *add some new metadatas*(just for > > example, kafka maybe stable, but a certain connector or middleware might > be > > developing, so its metadatas could be added or changed) > > e.g. at some time, kafka added a `host_name` metadata (indicate the > address > > of message broker). > > > > We can define sql like this: > > ``` > > CREATE TABLE KafkaTable ( > > origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' > VIRTUAL, > > host_name STRING METADATA VIRTUAL DYNAMIC, > > origin_properties MAP<STRING, STRING> METADATA FROM > > 'value.source.properties' VIRTUAL, > > user_id BIGINT, > > ) WITH ( > > 'connector' = 'kafka', > > 'value.format' = 'debezium-json' > > ... > > ); > > ``` > > Then users can use `host_name` this metadata, because it's a DYNAMIC > > metacolumn, flink dont't throw exception although `host_name` > > not belongs to kafka before, and the developers don't need to modify or > > rebuild flink source code and publish flink to online environment (it > comes > > at a high cost). > > > > Considering the return value: > > kafka before (no this metadata): null > > kafka now (added this metadata already): the concrete value > > > > Same user sql works well in the past and now even in the future rather > than > > check and deny these new metadata columns or modify connector > > implementation frequently to support it. > > And it's an option to configure by using 'DYNAMIC' at the metadata > > column(or other better implementations). > > > > [1] > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/ > > > > Timo Walther <twal...@apache.org> 于2022年8月25日周四 21:07写道: > > > >> Hi Ran, > >> > >> what would be the data type of this dynamic metadata column? The planner > >> and many parts of the stack will require a data type. > >> > >> Personally, I feel connector developers can already have the same > >> functionality by declaring a metadata column as `MAP<STRING, STRING>`. > >> This is what we expose already as `debezium.source.properties`. Whatever > >> Debezium adds will be available through this property and can be > >> accessed via `SELECT col['my-new-property'] FROM x` including being NULL > >> be default if not present. > >> > >> Regards, > >> Timo > >> > >> > >> On 25.08.22 14:04, Ran Tao wrote: > >>> ``` > >>> create table test_source( > >>> __test_metadata__ varchar METADATA, > >>> f0 varchar, > >>> f1 varchar, > >>> f2 bigint, > >>> ts as CURRENT_TIMESTAMP > >>> ) with( > >>> 'connector'='test', > >>> ... > >>> ) > >>> ``` > >>> > >>> If we not pre define `__test_metadata__` as meta keys by implementing > >>> listReadableMetadata, run the above sql, it will cause exception like > >> this: > >>> > >>> org.apache.flink.table.api.ValidationException: Invalid metadata key > >>> '__test_metadata__' in column '__test_metadata__' of table > >>> 'default_catalog.default_database.test_source'. The DynamicTableSource > >>> class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource' > >>> supports the following metadata keys for reading: > >>> xxx, yyy > >>> > >>> at > >>> > >> > org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409) > >>> > >>> Because the current flink metadata column must exist in results > returned > >> by > >>> `listReadableMetadata`. But when a certain connector adds some > >> metadatas, > >>> we can not use it directly unless we modify this connector code and > >> support > >>> it. In some situations, It can be intolerable. Can we support 'DYNAMIC > >>> MetadataColumn'? Its basic mechanism is not to check a column with > >>> existing metadatas and users can define it dynamically. If a certain > >>> connector without this metadata, the column value will return null > >>> otherwise return it's concrete value. It has great benefits in some > >>> scenarios. > >>> > >>> Looking forward to your opinions. > >>> > >>> > >> > >> > > > > -- > > Best Regards, > > Ran Tao > > https://github.com/chucheng92 > > -- Best Regards, Ran Tao https://github.com/chucheng92