SML0127 commented on code in PR #3658: URL: https://github.com/apache/flink-cdc/pull/3658#discussion_r1861726630
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java: ########## @@ -457,6 +495,61 @@ private void testCommonDataTypes(UniqueDatabase database) throws Exception { .isEqualTo(expectedStreamRecord); } + private void testJsonDataType(UniqueDatabase database, Boolean useLegacyJsonFormat) + throws Exception { + database.createAndInitialize(); + CloseableIterator<Event> iterator = + env.fromSource( + getFlinkSourceProvider( + new String[] {"json_types"}, + database, + useLegacyJsonFormat) + .getSource(), + WatermarkStrategy.noWatermarks(), + "Event-Source") + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), + BinaryStringData.fromString("{\"key1\": \"value1\"}"), + BinaryStringData.fromString("{\"key1\": \"value1\", \"key2\": \"value2\"}"), + BinaryStringData.fromString( + "[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"), + 1 + }; + + // skip CreateTableEvent + List<Event> snapshotResults = + MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, JSON_TYPES)) + .isEqualTo(expectedSnapshot); + + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("UPDATE json_types SET int_c = null WHERE id = 1;"); + } + + Object[] expectedStreamRecord = expectedSnapshot; + + if (useLegacyJsonFormat) { + expectedSnapshot[1] = BinaryStringData.fromString("{\"key1\":\"value1\"}"); + expectedSnapshot[2] = + BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}"); + expectedSnapshot[3] = + BinaryStringData.fromString( + "[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"); + } + expectedSnapshot[4] = null; Review Comment: I revised it more clearly as shown below. ```java if (useLegacyJsonFormat) { // removed whitespace before value and after comma in json format string value Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES)) .containsExactly( DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), BinaryStringData.fromString("{\"key1\":\"value1\"}"), BinaryStringData.fromString( "{\"key1\":\"value1\",\"key2\":\"value2\"}"), BinaryStringData.fromString( "[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"), null); } else { Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES)) .containsExactly(expectedStreamRecord); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org