lvyanquan commented on code in PR #3468:
URL: https://github.com/apache/flink-cdc/pull/3468#discussion_r1673449557


##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java:
##########
@@ -335,6 +337,65 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 
20], after=[2, x, 20], op=UPDATE, meta=()}");
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup transform
+        TransformDef transformDef =
+                new TransformDef(
+                        "default_namespace.default_schema.table1",
+                        "*,concat(col1,'0') as col12,__op_type__ as op",
+                        "col1 <> '3'",
+                        "col1",
+                        "col12",
+                        "key1=value1",
+                        "");
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        new ArrayList<>(Arrays.asList(transformDef)),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+        assertThat(outputEvents)
+                .containsExactly(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`__op_type__` STRING NOT 
NULL,`col12` STRING,`op` STRING}, primaryKeys=col1, partitionKeys=col12, 
options=({key1=value1})}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, INSERT, 10, INSERT], op=INSERT, meta=()}",

Review Comment:
   Should we keep in consistence with 
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#available-metadata
 to use `row_kind` as key and return `+I`, `-D`,`+U` ? But we don't actually 
have `+U` & `-U` kind, and This also has historical issues for `database_name` 
and `table_name` as we used another keys for them before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to