yuxiqian commented on code in PR #3622:
URL: https://github.com/apache/flink-cdc/pull/3622#discussion_r1814417426


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java:
##########
@@ -50,6 +50,7 @@ public class ValuesDataSourceHelper {
      */
     public enum EventSetId {
         SINGLE_SPLIT_SINGLE_TABLE,
+        SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE,

Review Comment:
   Maybe `ConfigOption<ValuesDataSourceHelper.EventSetId>`'s description needs 
to be updated correspondingly



##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java:
##########
@@ -169,6 +171,66 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi 
sinkApi) throws Exception
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 
null, null, null, ], after=[2, null, null, null, x], op=UPDATE, meta=()}");
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testSingleSplitSingleTableWithDefaultValue(ValuesDataSink.SinkApi 
sinkApi)
+            throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE);
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check result in ValuesDatabase
+        List<String> results = ValuesDatabase.getResults(TABLE_1);
+        assertThat(results)
+                .contains(

Review Comment:
   Will `.containsExactlyInAnyOrder` be more suitable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to