lvyanquan commented on code in PR #3468: URL: https://github.com/apache/flink-cdc/pull/3468#discussion_r1673449557
########## flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java: ########## @@ -335,6 +337,65 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}"); } + @ParameterizedTest + @EnumSource + void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup transform + TransformDef transformDef = + new TransformDef( + "default_namespace.default_schema.table1", + "*,concat(col1,'0') as col12,__op_type__ as op", + "col1 <> '3'", + "col1", + "col12", + "key1=value1", + ""); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + new ArrayList<>(Arrays.asList(transformDef)), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`__op_type__` STRING NOT NULL,`col12` STRING,`op` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, INSERT, 10, INSERT], op=INSERT, meta=()}", Review Comment: Should we keep in consistence with https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#available-metadata to use `row_kind` as key and return `+I`, `-D`,`+U` ? But we don't actually have `+U` & `-U` kind, and This also has historical issues for `database_name` and `table_name` as we used another keys for them before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org