SML0127 commented on code in PR #3658:
URL: https://github.com/apache/flink-cdc/pull/3658#discussion_r1861726630


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java:
##########
@@ -457,6 +495,61 @@ private void testCommonDataTypes(UniqueDatabase database) 
throws Exception {
                 .isEqualTo(expectedStreamRecord);
     }
 
+    private void testJsonDataType(UniqueDatabase database, Boolean 
useLegacyJsonFormat)
+            throws Exception {
+        database.createAndInitialize();
+        CloseableIterator<Event> iterator =
+                env.fromSource(
+                                getFlinkSourceProvider(
+                                                new String[] {"json_types"},
+                                                database,
+                                                useLegacyJsonFormat)
+                                        .getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                "Event-Source")
+                        .executeAndCollect();
+
+        Object[] expectedSnapshot =
+                new Object[] {
+                    DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
+                    BinaryStringData.fromString("{\"key1\": \"value1\"}"),
+                    BinaryStringData.fromString("{\"key1\": \"value1\", 
\"key2\": \"value2\"}"),
+                    BinaryStringData.fromString(
+                            "[{\"key1\": \"value1\", \"key2\": {\"key2_1\": 
\"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": 
[\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"),
+                    1
+                };
+
+        // skip CreateTableEvent
+        List<Event> snapshotResults =
+                MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 
1).f0;
+        RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(0)).after();
+        Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, 
JSON_TYPES))
+                .isEqualTo(expectedSnapshot);
+
+        try (Connection connection = database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("UPDATE json_types SET int_c = null WHERE id = 
1;");
+        }
+
+        Object[] expectedStreamRecord = expectedSnapshot;
+
+        if (useLegacyJsonFormat) {
+            expectedSnapshot[1] = 
BinaryStringData.fromString("{\"key1\":\"value1\"}");
+            expectedSnapshot[2] =
+                    
BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}");
+            expectedSnapshot[3] =
+                    BinaryStringData.fromString(
+                            
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]");
+        }
+        expectedSnapshot[4] = null;

Review Comment:
   I revised it more clearly as shown below.
   
   ```java
   if (useLegacyJsonFormat) {
       // removed whitespace before value and after comma in json format string 
value
       Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, 
JSON_TYPES))
               .containsExactly(
                       DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
                       BinaryStringData.fromString("{\"key1\":\"value1\"}"),
                       BinaryStringData.fromString(
                               "{\"key1\":\"value1\",\"key2\":\"value2\"}"),
                       BinaryStringData.fromString(
                               
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"),
                       null);
   } else {
       Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, 
JSON_TYPES))
               .containsExactly(expectedStreamRecord);
   }
   ```   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to