Hi, Jark & Timo. I'm glad to support this feature, and if you guys agree, I'll be ready to create a FLIP, and then you guys and other developers can review and check some specifics.
Thanks. Jark Wu <imj...@gmail.com> 于2022年8月30日周二 20:24写道: > Thank you Ran for the explanation. > > The column DEFAULT is a reasonable feature and can also help in other > cases. > I’m fine with adding this feature. > Do you want to prepare a FLIP for it? > > Best, > Jark > > > 2022年8月29日 15:02,Ran Tao <chucheng...@gmail.com> 写道: > > > > Hi Jack. Timo summed it up very well. In fact, my problem is that the > > current flink table metadata is fixed and cannot be compatible with the > > connector's changes in metadata columns. > > A metadata column that did not exist in the past, does exist at some > point > > in the future, and vice versa. > > There is forward and backward compatibility here. > > > > Jark Wu <imj...@gmail.com> 于2022年8月26日周五 16:28写道: > > > >> Hi Ran, > >> > >> If the metadata is from the message properties, then you can manually > cast > >> it to your preferred types, > >> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS > >> TIMESTAMP)`. > >> > >> If the metadata is not from the message properties, how does the > connector > >> know which field to convert from? > >> Shouldn’t the connector be modified to support this new metadata column? > >> > >> Best, > >> Jark > >> > >> > >> > >>> 2022年8月26日 15:30,Ran Tao <chucheng...@gmail.com> 写道: > >>> > >>> Hi, TiMo. I think using one map column in the debezium format you > >>> illustrated above can't cover the discussed scenario. > >>> It's not the same thing. > >>> > >>> Here is a debezium format example from flink docs: [1] > >>> > >>> ``` > >>> CREATE TABLE KafkaTable ( > >>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' > >> VIRTUAL, > >>> origin_properties MAP<STRING, STRING> METADATA FROM > >>> 'value.source.properties' VIRTUAL, > >>> user_id BIGINT, > >>> ) WITH ( > >>> 'connector' = 'kafka', > >>> 'value.format' = 'debezium-json' > >>> ... > >>> ); > >>> ``` > >>> > >>> *the `origin_properties` is a column used for properties. So we define > it > >>> at MAP *(just like you respond). But the other metadata columns have > >> their > >>> own data types. > >>> e.g. `origin_ts` is TIMESTAMP. We can not flatmap all metadata columns > >>> within one MAP<STRING, STRING> column. it's not a good idea. > >>> > >>> My suggestion is that if kafka above *add some new metadatas*(just for > >>> example, kafka maybe stable, but a certain connector or middleware > might > >> be > >>> developing, so its metadatas could be added or changed) > >>> e.g. at some time, kafka added a `host_name` metadata (indicate the > >> address > >>> of message broker). > >>> > >>> We can define sql like this: > >>> ``` > >>> CREATE TABLE KafkaTable ( > >>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' > >> VIRTUAL, > >>> host_name STRING METADATA VIRTUAL DYNAMIC, > >>> origin_properties MAP<STRING, STRING> METADATA FROM > >>> 'value.source.properties' VIRTUAL, > >>> user_id BIGINT, > >>> ) WITH ( > >>> 'connector' = 'kafka', > >>> 'value.format' = 'debezium-json' > >>> ... > >>> ); > >>> ``` > >>> Then users can use `host_name` this metadata, because it's a DYNAMIC > >>> metacolumn, flink dont't throw exception although `host_name` > >>> not belongs to kafka before, and the developers don't need to modify or > >>> rebuild flink source code and publish flink to online environment (it > >> comes > >>> at a high cost). > >>> > >>> Considering the return value: > >>> kafka before (no this metadata): null > >>> kafka now (added this metadata already): the concrete value > >>> > >>> Same user sql works well in the past and now even in the future rather > >> than > >>> check and deny these new metadata columns or modify connector > >>> implementation frequently to support it. > >>> And it's an option to configure by using 'DYNAMIC' at the metadata > >>> column(or other better implementations). > >>> > >>> [1] > >>> > >> > https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/ > >>> > >>> Timo Walther <twal...@apache.org> 于2022年8月25日周四 21:07写道: > >>> > >>>> Hi Ran, > >>>> > >>>> what would be the data type of this dynamic metadata column? The > planner > >>>> and many parts of the stack will require a data type. > >>>> > >>>> Personally, I feel connector developers can already have the same > >>>> functionality by declaring a metadata column as `MAP<STRING, STRING>`. > >>>> This is what we expose already as `debezium.source.properties`. > Whatever > >>>> Debezium adds will be available through this property and can be > >>>> accessed via `SELECT col['my-new-property'] FROM x` including being > NULL > >>>> be default if not present. > >>>> > >>>> Regards, > >>>> Timo > >>>> > >>>> > >>>> On 25.08.22 14:04, Ran Tao wrote: > >>>>> ``` > >>>>> create table test_source( > >>>>> __test_metadata__ varchar METADATA, > >>>>> f0 varchar, > >>>>> f1 varchar, > >>>>> f2 bigint, > >>>>> ts as CURRENT_TIMESTAMP > >>>>> ) with( > >>>>> 'connector'='test', > >>>>> ... > >>>>> ) > >>>>> ``` > >>>>> > >>>>> If we not pre define `__test_metadata__` as meta keys by implementing > >>>>> listReadableMetadata, run the above sql, it will cause exception like > >>>> this: > >>>>> > >>>>> org.apache.flink.table.api.ValidationException: Invalid metadata key > >>>>> '__test_metadata__' in column '__test_metadata__' of table > >>>>> 'default_catalog.default_database.test_source'. The > DynamicTableSource > >>>>> class > 'com.alipay.flink.connectors.test.source.TestDynamicTableSource' > >>>>> supports the following metadata keys for reading: > >>>>> xxx, yyy > >>>>> > >>>>> at > >>>>> > >>>> > >> > org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409) > >>>>> > >>>>> Because the current flink metadata column must exist in results > >> returned > >>>> by > >>>>> `listReadableMetadata`. But when a certain connector adds some > >>>> metadatas, > >>>>> we can not use it directly unless we modify this connector code and > >>>> support > >>>>> it. In some situations, It can be intolerable. Can we support > 'DYNAMIC > >>>>> MetadataColumn'? Its basic mechanism is not to check a column with > >>>>> existing metadatas and users can define it dynamically. If a certain > >>>>> connector without this metadata, the column value will return null > >>>>> otherwise return it's concrete value. It has great benefits in some > >>>>> scenarios. > >>>>> > >>>>> Looking forward to your opinions. > >>>>> > >>>>> > >>>> > >>>> > >>> > >>> -- > >>> Best Regards, > >>> Ran Tao > >>> https://github.com/chucheng92 > >> > >> > > > > -- > > Best Regards, > > Ran Tao > > https://github.com/chucheng92 > > -- Best Regards, Ran Tao https://github.com/chucheng92