wuchong commented on a change in pull request #14693:
URL: https://github.com/apache/flink/pull/14693#discussion_r561709274



##########
File path: flink-formats/flink-json/src/test/resources/canal-data.txt
##########
@@ -1,5 +1,5 @@
 {"data":[{"id":"101","name":"scooter","description":"Small 2-wheel 
scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V 
car battery","weight":"8.1"},{"id":"103","name":"12-pack drill 
bits","description":"12-pack of drill bits with sizes ranging from #40 to 
#3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's 
hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz 
carpenter's 
hammer","weight":"0.875"},{"id":"106","name":"hammer","description":"16oz 
carpenter's 
hammer","weight":"1.0"},{"id":"107","name":"rocks","description":"box of 
assorted 
rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water 
resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare 
tire","description":"24 inch spare 
tire","weight":"22.2"}],"database":"inventory","es":1589373515000,"id":3,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames
 
":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373515477,"type":"INSERT"}

Review comment:
       ditto.

##########
File path: 
flink-formats/flink-json/src/test/resources/canal-data-filter-table.txt
##########
@@ -1,5 +1,5 @@
 {"data":[{"id":"101","name":"scooter","description":"Small 2-wheel 
scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V 
car battery","weight":"8.1"},{"id":"103","name":"12-pack drill 
bits","description":"12-pack of drill bits with sizes ranging from #40 to 
#3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's 
hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz 
carpenter's 
hammer","weight":"0.875"},{"id":"106","name":"hammer","description":"16oz 
carpenter's 
hammer","weight":"1.0"},{"id":"107","name":"rocks","description":"box of 
assorted 
rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water 
resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare 
tire","description":"24 inch spare 
tire","weight":"22.2"}],"database":"mydb","es":1598944132000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["i
 
d"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944146308,"type":"INSERT"}

Review comment:
       ```suggestion
   {"data":[{"id":"101","name":"scooter","description":"Small 2-wheel 
scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V 
car battery","weight":"8.1"},{"id":"103","name":"12-pack drill 
bits","description":"12-pack of drill bits with sizes ranging from #40 to 
#3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's 
hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz 
carpenter's 
hammer","weight":"0.875"},{"id":"106","name":"hammer","description":null,"weight":"1.0"},{"id":"107","name":"rocks","description":"box
 of assorted 
rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water 
resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare 
tire","description":"24 inch spare 
tire","weight":"22.2"}],"database":"mydb","es":1598944132000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlTy
 
pe":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944146308,"type":"INSERT"}
   ```
   
   I think you want to test the original `description` is null. 

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
##########
@@ -238,8 +244,9 @@ public void deserialize(@Nullable byte[] message, 
Collector<RowData> out) throws
                     // the underlying JSON deserialization schema always 
produce GenericRowData.
                     GenericRowData after = (GenericRowData) data.getRow(i, 
fieldCount);
                     GenericRowData before = (GenericRowData) old.getRow(i, 
fieldCount);
+                    final JsonNode oldField = root.get(FIELD_OLD);
                     for (int f = 0; f < fieldCount; f++) {
-                        if (before.isNullAt(f)) {
+                        if (before.isNullAt(f) && 
oldField.findValue(fieldNames.get(f)) == null) {
                             // not null fields in "old" (before) means the 
fields are changed
                             // null/empty fields in "old" (before) means the 
fields are not changed

Review comment:
       ```suggestion
                               // fields in "old" (before) means the fields are 
changed
                               // fields not in "old" (before) means the fields 
are not changed
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to