yuxiqian commented on code in PR #3622:
URL: https://github.com/apache/flink-cdc/pull/3622#discussion_r1815882769


##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java:
##########
@@ -169,6 +171,66 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi 
sinkApi) throws Exception
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 
null, null, null, ], after=[2, null, null, null, x], op=UPDATE, meta=()}");
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testSingleSplitSingleTableWithDefaultValue(ValuesDataSink.SinkApi 
sinkApi)
+            throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE);
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check result in ValuesDatabase
+        List<String> results = ValuesDatabase.getResults(TABLE_1);
+        assertThat(results)
+                .containsExactly(

Review Comment:
   `ValuesDatabase` uses `HashMap` to store records internally, and does not 
guarantee orderness IIUC. `containsExactlyInAnyOrder` might be more appropriate.



##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java:
##########
@@ -169,6 +171,66 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi 
sinkApi) throws Exception
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 
null, null, null, ], after=[2, null, null, null, x], op=UPDATE, meta=()}");
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testSingleSplitSingleTableWithDefaultValue(ValuesDataSink.SinkApi 
sinkApi)
+            throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE);
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check result in ValuesDatabase
+        List<String> results = ValuesDatabase.getResults(TABLE_1);
+        assertThat(results)
+                .containsExactly(

Review Comment:
   `ValuesDatabase` uses `HashMap` to store records internally, and does not 
guarantee orderliness IIUC. `containsExactlyInAnyOrder` might be more 
appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to