Hi, Jark & Timo. I'm glad to support this feature, and if you guys agree,
I'll be ready to create a FLIP, and then you guys and other developers can
review and check some specifics.

Thanks.

Jark Wu <imj...@gmail.com> 于2022年8月30日周二 20:24写道:

> Thank you Ran for the explanation.
>
> The column DEFAULT is a reasonable feature and can also help in other
> cases.
> I’m fine with adding this feature.
> Do you want to prepare a FLIP for it?
>
> Best,
> Jark
>
> > 2022年8月29日 15:02,Ran Tao <chucheng...@gmail.com> 写道:
> >
> > Hi Jack. Timo summed it up very well. In fact, my problem is that the
> > current flink table metadata is fixed and cannot be compatible with the
> > connector's changes in metadata columns.
> > A metadata column that did not exist in the past, does exist at some
> point
> > in the future, and vice versa.
> > There is forward and backward compatibility here.
> >
> > Jark Wu <imj...@gmail.com> 于2022年8月26日周五 16:28写道:
> >
> >> Hi Ran,
> >>
> >> If the metadata is from the message properties, then you can manually
> cast
> >> it to your preferred types,
> >> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS
> >> TIMESTAMP)`.
> >>
> >> If the metadata is not from the message properties, how does the
> connector
> >> know which field to convert from?
> >> Shouldn’t the connector be modified to support this new metadata column?
> >>
> >> Best,
> >> Jark
> >>
> >>
> >>
> >>> 2022年8月26日 15:30,Ran Tao <chucheng...@gmail.com> 写道:
> >>>
> >>> Hi, TiMo. I think using one map column in the debezium format you
> >>> illustrated above can't cover the discussed scenario.
> >>> It's not the same thing.
> >>>
> >>> Here is a debezium format example from flink docs: [1]
> >>>
> >>> ```
> >>> CREATE TABLE KafkaTable (
> >>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> >> VIRTUAL,
> >>> origin_properties MAP<STRING, STRING> METADATA FROM
> >>> 'value.source.properties' VIRTUAL,
> >>> user_id BIGINT,
> >>> ) WITH (
> >>> 'connector' = 'kafka',
> >>> 'value.format' = 'debezium-json'
> >>> ...
> >>> );
> >>> ```
> >>>
> >>> *the `origin_properties` is a column used for properties. So we define
> it
> >>> at MAP *(just like you respond). But the other metadata columns have
> >> their
> >>> own data types.
> >>> e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata columns
> >>> within one MAP<STRING, STRING> column. it's not a good idea.
> >>>
> >>> My suggestion is that if kafka above *add some new metadatas*(just for
> >>> example, kafka maybe stable, but a certain connector or middleware
> might
> >> be
> >>> developing, so its metadatas could be added or changed)
> >>> e.g. at some time, kafka added a `host_name` metadata (indicate the
> >> address
> >>> of message broker).
> >>>
> >>> We can define sql like this:
> >>> ```
> >>> CREATE TABLE KafkaTable (
> >>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> >> VIRTUAL,
> >>> host_name STRING METADATA VIRTUAL DYNAMIC,
> >>> origin_properties MAP<STRING, STRING> METADATA FROM
> >>> 'value.source.properties' VIRTUAL,
> >>> user_id BIGINT,
> >>> ) WITH (
> >>> 'connector' = 'kafka',
> >>> 'value.format' = 'debezium-json'
> >>> ...
> >>> );
> >>> ```
> >>> Then users can use `host_name` this metadata, because it's a DYNAMIC
> >>> metacolumn, flink dont't throw exception although `host_name`
> >>> not belongs to kafka before, and the developers don't need to modify or
> >>> rebuild flink source code and publish flink to online environment (it
> >> comes
> >>> at a high cost).
> >>>
> >>> Considering the return value:
> >>> kafka before (no this metadata): null
> >>> kafka now (added this metadata already): the concrete value
> >>>
> >>> Same user sql works well in the past and now even in the future rather
> >> than
> >>> check and deny these new metadata columns or modify connector
> >>> implementation frequently to support it.
> >>> And it's an option to configure by using 'DYNAMIC' at the metadata
> >>> column(or other better implementations).
> >>>
> >>> [1]
> >>>
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/
> >>>
> >>> Timo Walther <twal...@apache.org> 于2022年8月25日周四 21:07写道:
> >>>
> >>>> Hi Ran,
> >>>>
> >>>> what would be the data type of this dynamic metadata column? The
> planner
> >>>> and many parts of the stack will require a data type.
> >>>>
> >>>> Personally, I feel connector developers can already have the same
> >>>> functionality by declaring a metadata column as `MAP<STRING, STRING>`.
> >>>> This is what we expose already as `debezium.source.properties`.
> Whatever
> >>>> Debezium adds will be available through this property and can be
> >>>> accessed via `SELECT col['my-new-property'] FROM x` including being
> NULL
> >>>> be default if not present.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 25.08.22 14:04, Ran Tao wrote:
> >>>>> ```
> >>>>> create table test_source(
> >>>>> __test_metadata__ varchar METADATA,
> >>>>> f0 varchar,
> >>>>> f1 varchar,
> >>>>> f2 bigint,
> >>>>> ts as CURRENT_TIMESTAMP
> >>>>> ) with(
> >>>>> 'connector'='test',
> >>>>>  ...
> >>>>> )
> >>>>> ```
> >>>>>
> >>>>> If we not pre define `__test_metadata__` as meta keys by implementing
> >>>>> listReadableMetadata, run the above sql, it will cause exception like
> >>>> this:
> >>>>>
> >>>>> org.apache.flink.table.api.ValidationException: Invalid metadata key
> >>>>> '__test_metadata__' in column '__test_metadata__' of table
> >>>>> 'default_catalog.default_database.test_source'. The
> DynamicTableSource
> >>>>> class
> 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
> >>>>> supports the following metadata keys for reading:
> >>>>> xxx, yyy
> >>>>>
> >>>>> at
> >>>>>
> >>>>
> >>
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
> >>>>>
> >>>>> Because the current flink metadata column must exist in results
> >> returned
> >>>> by
> >>>>> `listReadableMetadata`.  But when a certain connector adds some
> >>>> metadatas,
> >>>>> we can not use it directly unless we modify this connector code and
> >>>> support
> >>>>> it. In some situations, It can be intolerable. Can we support
> 'DYNAMIC
> >>>>> MetadataColumn'?  Its basic mechanism is not to check a column with
> >>>>> existing metadatas and users can define it dynamically. If a certain
> >>>>> connector without this metadata, the column value will return null
> >>>>> otherwise return it's concrete value. It has great benefits in some
> >>>>> scenarios.
> >>>>>
> >>>>> Looking forward to your opinions.
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>> --
> >>> Best Regards,
> >>> Ran Tao
> >>> https://github.com/chucheng92
> >>
> >>
> >
> > --
> > Best Regards,
> > Ran Tao
> > https://github.com/chucheng92
>
>

-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Reply via email to