Hi Jack. Timo summed it up very well. In fact, my problem is that the
current flink table metadata is fixed and cannot be compatible with the
connector's changes in metadata columns.
A metadata column that did not exist in the past, does exist at some point
in the future, and vice versa.
There is forward and backward compatibility here.

Jark Wu <imj...@gmail.com> 于2022年8月26日周五 16:28写道:

> Hi Ran,
>
> If the metadata is from the message properties, then you can manually cast
> it to your preferred types,
> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS
> TIMESTAMP)`.
>
> If the metadata is not from the message properties, how does the connector
> know which field to convert from?
> Shouldn’t the connector be modified to support this new metadata column?
>
> Best,
> Jark
>
>
>
> > 2022年8月26日 15:30,Ran Tao <chucheng...@gmail.com> 写道:
> >
> > Hi, TiMo. I think using one map column in the debezium format you
> > illustrated above can't cover the discussed scenario.
> > It's not the same thing.
> >
> > Here is a debezium format example from flink docs: [1]
> >
> > ```
> > CREATE TABLE KafkaTable (
> >  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> VIRTUAL,
> >  origin_properties MAP<STRING, STRING> METADATA FROM
> > 'value.source.properties' VIRTUAL,
> >  user_id BIGINT,
> > ) WITH (
> >  'connector' = 'kafka',
> >  'value.format' = 'debezium-json'
> >  ...
> > );
> > ```
> >
> > *the `origin_properties` is a column used for properties. So we define it
> > at MAP *(just like you respond). But the other metadata columns have
> their
> > own data types.
> > e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata columns
> > within one MAP<STRING, STRING> column. it's not a good idea.
> >
> > My suggestion is that if kafka above *add some new metadatas*(just for
> > example, kafka maybe stable, but a certain connector or middleware might
> be
> > developing, so its metadatas could be added or changed)
> > e.g. at some time, kafka added a `host_name` metadata (indicate the
> address
> > of message broker).
> >
> > We can define sql like this:
> > ```
> > CREATE TABLE KafkaTable (
> >  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> VIRTUAL,
> >  host_name STRING METADATA VIRTUAL DYNAMIC,
> >  origin_properties MAP<STRING, STRING> METADATA FROM
> > 'value.source.properties' VIRTUAL,
> >  user_id BIGINT,
> > ) WITH (
> >  'connector' = 'kafka',
> >  'value.format' = 'debezium-json'
> >  ...
> > );
> > ```
> > Then users can use `host_name` this metadata, because it's a DYNAMIC
> > metacolumn, flink dont't throw exception although `host_name`
> > not belongs to kafka before, and the developers don't need to modify or
> > rebuild flink source code and publish flink to online environment (it
> comes
> > at a high cost).
> >
> > Considering the return value:
> > kafka before (no this metadata): null
> > kafka now (added this metadata already): the concrete value
> >
> > Same user sql works well in the past and now even in the future rather
> than
> > check and deny these new metadata columns or modify connector
> > implementation frequently to support it.
> > And it's an option to configure by using 'DYNAMIC' at the metadata
> > column(or other better implementations).
> >
> > [1]
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/
> >
> > Timo Walther <twal...@apache.org> 于2022年8月25日周四 21:07写道:
> >
> >> Hi Ran,
> >>
> >> what would be the data type of this dynamic metadata column? The planner
> >> and many parts of the stack will require a data type.
> >>
> >> Personally, I feel connector developers can already have the same
> >> functionality by declaring a metadata column as `MAP<STRING, STRING>`.
> >> This is what we expose already as `debezium.source.properties`. Whatever
> >> Debezium adds will be available through this property and can be
> >> accessed via `SELECT col['my-new-property'] FROM x` including being NULL
> >> be default if not present.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 25.08.22 14:04, Ran Tao wrote:
> >>> ```
> >>> create table test_source(
> >>>  __test_metadata__ varchar METADATA,
> >>>  f0 varchar,
> >>>  f1 varchar,
> >>>  f2 bigint,
> >>>  ts as CURRENT_TIMESTAMP
> >>> ) with(
> >>>  'connector'='test',
> >>>   ...
> >>> )
> >>> ```
> >>>
> >>> If we not pre define `__test_metadata__` as meta keys by implementing
> >>> listReadableMetadata, run the above sql, it will cause exception like
> >> this:
> >>>
> >>> org.apache.flink.table.api.ValidationException: Invalid metadata key
> >>> '__test_metadata__' in column '__test_metadata__' of table
> >>> 'default_catalog.default_database.test_source'. The DynamicTableSource
> >>> class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
> >>> supports the following metadata keys for reading:
> >>> xxx, yyy
> >>>
> >>> at
> >>>
> >>
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
> >>>
> >>> Because the current flink metadata column must exist in results
> returned
> >> by
> >>> `listReadableMetadata`.  But when a certain connector adds some
> >> metadatas,
> >>> we can not use it directly unless we modify this connector code and
> >> support
> >>> it. In some situations, It can be intolerable. Can we support 'DYNAMIC
> >>> MetadataColumn'?  Its basic mechanism is not to check a column with
> >>> existing metadatas and users can define it dynamically. If a certain
> >>> connector without this metadata, the column value will return null
> >>> otherwise return it's concrete value. It has great benefits in some
> >>> scenarios.
> >>>
> >>> Looking forward to your opinions.
> >>>
> >>>
> >>
> >>
> >
> > --
> > Best Regards,
> > Ran Tao
> > https://github.com/chucheng92
>
>

-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Reply via email to