[ 
https://issues.apache.org/jira/browse/FLINK-37132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17946051#comment-17946051
 ] 

Yanquan Lv edited comment on FLINK-37132 at 4/21/25 6:24 AM:
-------------------------------------------------------------

Hi, [~MOBIN].

Consider the situation that the user wishes to add an identifier_name field to 
all tables, but adds an additional field to a certain table.  
The job would be written as:
{panel}
transform:
 - source-table: mydb.web_order
projection: *, order_id, UPPER(product_name) as product_name, 
__namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS 
identifier_name, __data_event_type__ AS type, op_ts AS opts
 - source-table: \.*.\.*
   projection: *,  __namespace_name__ || '.' || __schema_name__ || '.' || 
__table_name__ AS identifier_name, __data_event_type__ AS type, op_ts AS 
opts{panel}
Because we do not have the exclude.table function in transform, if we do not 
support this kind of configuration, it will be difficult for users to meet 
their needs.

 


was (Author: JIRAUSER304414):
Hi, [~MOBIN].

Consider the situation that the user wishes to add an identifier_name field to 
all tables, but adds an additional field to a certain table.  
The job would be written as:
transform:
 - source-table: mydb.web_order
projection: *, order_id, UPPER(product_name) as product_name,  
_{_}namespace_name{_}_ || '.' || _{_}schema_name{_}_ || '.' || 
_{_}table_name{_}_ identifier_name
   partition-keys: product_name
 - source-table: \.*.\.*
   projection: *,  _{_}namespace_name{_}_ || '.' || _{_}schema_name{_}_ || '.' 
|| _{_}table_name{_}_ identifier_name    

Because we do not have the exclude.table function in transform, if we do not 
support this kind of configuration, it will be difficult for users to meet 
their needs.

 

> Add schema validation in Multi Transform
> ----------------------------------------
>
>                 Key: FLINK-37132
>                 URL: https://issues.apache.org/jira/browse/FLINK-37132
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.2.0, cdc-3.2.1
>            Reporter: MOBIN
>            Assignee: MOBIN
>            Priority: Major
>              Labels: pull-request-available
>
> The following scenarios should throw an exception of [different column count]
> {code:java}
> void testMultiTransformSchemaColumnsCompatibilityWithDiffColumnCount(
>         ValuesDataSink.SinkApi sinkApi) {
>     assertThatThrownBy(
>                     () ->
>                             runGenericTransformTest(
>                                     sinkApi,
>                                     Arrays.asList(
>                                             new TransformDef(
>                                                     
> "default_namespace.default_schema.mytable2",
>                                                     "*",
>                                                     "age < 18",
>                                                     null,
>                                                     null,
>                                                     null,
>                                                     null,
>                                                     null),
>                                             new TransformDef(
>                                                     
> "default_namespace.default_schema.mytable2",
>                                                     // reference part column
>                                                     "id,UPPER(name) AS name",
>                                                     "age >= 18",
>                                                     null,
>                                                     null,
>                                                     null,
>                                                     null,
>                                                     null)),
>                                     Collections.emptyList()))
>             .rootCause()
>             .isExactlyInstanceOf(IllegalStateException.class)
>             .hasMessage(
>                     "Unable to merge schema columns={`id` BIGINT NOT 
> NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, 
> options=() "
>                             + "and columns={`id` BIGINT NOT NULL,`name` 
> STRING}, primaryKeys=id, options=() with different column counts.");
> } {code}
> In Multi Transform, metadata fields like primaryKeys, partitionKeys, and 
> options also need to be consistent.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to