yuxiqian commented on code in PR #3622: URL: https://github.com/apache/flink-cdc/pull/3622#discussion_r1814417426
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java: ########## @@ -50,6 +50,7 @@ public class ValuesDataSourceHelper { */ public enum EventSetId { SINGLE_SPLIT_SINGLE_TABLE, + SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE, Review Comment: Maybe `ConfigOption<ValuesDataSourceHelper.EventSetId>`'s description needs to be updated correspondingly ########## flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java: ########## @@ -169,6 +171,66 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, null, null, ], after=[2, null, null, null, x], op=UPDATE, meta=()}"); } + @ParameterizedTest + @EnumSource + void testSingleSplitSingleTableWithDefaultValue(ValuesDataSink.SinkApi sinkApi) + throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + List<String> results = ValuesDatabase.getResults(TABLE_1); + assertThat(results) + .contains( Review Comment: Will `.containsExactlyInAnyOrder` be more suitable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org