wuchong commented on a change in pull request #14693: URL: https://github.com/apache/flink/pull/14693#discussion_r561709274
########## File path: flink-formats/flink-json/src/test/resources/canal-data.txt ########## @@ -1,5 +1,5 @@ {"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"106","name":"hammer","description":"16oz carpenter's hammer","weight":"1.0"},{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"inventory","es":1589373515000,"id":3,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames ":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373515477,"type":"INSERT"} Review comment: ditto. ########## File path: flink-formats/flink-json/src/test/resources/canal-data-filter-table.txt ########## @@ -1,5 +1,5 @@ {"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"106","name":"hammer","description":"16oz carpenter's hammer","weight":"1.0"},{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"mydb","es":1598944132000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["i d"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944146308,"type":"INSERT"} Review comment: ```suggestion {"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"106","name":"hammer","description":null,"weight":"1.0"},{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"mydb","es":1598944132000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlTy pe":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944146308,"type":"INSERT"} ``` I think you want to test the original `description` is null. ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java ########## @@ -238,8 +244,9 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws // the underlying JSON deserialization schema always produce GenericRowData. GenericRowData after = (GenericRowData) data.getRow(i, fieldCount); GenericRowData before = (GenericRowData) old.getRow(i, fieldCount); + final JsonNode oldField = root.get(FIELD_OLD); for (int f = 0; f < fieldCount; f++) { - if (before.isNullAt(f)) { + if (before.isNullAt(f) && oldField.findValue(fieldNames.get(f)) == null) { // not null fields in "old" (before) means the fields are changed // null/empty fields in "old" (before) means the fields are not changed Review comment: ```suggestion // fields in "old" (before) means the fields are changed // fields not in "old" (before) means the fields are not changed ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org