yuxiqian commented on code in PR #3622: URL: https://github.com/apache/flink-cdc/pull/3622#discussion_r1815882769
########## flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java: ########## @@ -169,6 +171,66 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, null, null, ], after=[2, null, null, null, x], op=UPDATE, meta=()}"); } + @ParameterizedTest + @EnumSource + void testSingleSplitSingleTableWithDefaultValue(ValuesDataSink.SinkApi sinkApi) + throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + List<String> results = ValuesDatabase.getResults(TABLE_1); + assertThat(results) + .containsExactly( Review Comment: `ValuesDatabase` uses `HashMap` to store records internally, and does not guarantee orderness IIUC. `containsExactlyInAnyOrder` might be more appropriate. ########## flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java: ########## @@ -169,6 +171,66 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, null, null, ], after=[2, null, null, null, x], op=UPDATE, meta=()}"); } + @ParameterizedTest + @EnumSource + void testSingleSplitSingleTableWithDefaultValue(ValuesDataSink.SinkApi sinkApi) + throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + List<String> results = ValuesDatabase.getResults(TABLE_1); + assertThat(results) + .containsExactly( Review Comment: `ValuesDatabase` uses `HashMap` to store records internally, and does not guarantee orderliness IIUC. `containsExactlyInAnyOrder` might be more appropriate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org