[ https://issues.apache.org/jira/browse/FLINK-20763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
WangRuoQi updated FLINK-20763: ------------------------------ Attachment: canal_format.patch > canal format parse update record with null value get wrong result > ----------------------------------------------------------------- > > Key: FLINK-20763 > URL: https://issues.apache.org/jira/browse/FLINK-20763 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Affects Versions: 1.11.2 > Reporter: WangRuoQi > Priority: Major > Attachments: canal_format.patch, canal_format.patch > > > When i use canal format to consume mysql binlog like this: > {code:java} > select ymd,count(order_no),count(*) from order_table where status>=3 group by > ymd;{code} > I get result like this: > {code:java} > (20201212,10,10) > .. > (20201212,20,24) > .. > (20201212,100,130) > ..{code} > I am ensure than when status>=3, every record has a valid order_no, and i got > a result with diferrent count(order_no) and count(*). > I found this on debugging. > {code:java} > insert into order_table(id,ymd,order_no,status) values(1,20201212,null,1); > -- +I(1,20201212,null,1) > update table order_table set order_no=123,status=3 where id=1; > -- -U(1,20201212,123,1) # err, correct log may be -U(1,20201212,null,1) > -- +U(1,20201212,123,3){code} > So i notice that the canal format meet bug when parse update record. > > The source code logic is > {code:java} > // org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema > } else if (OP_UPDATE.equals(type)) { > // "data" field is an array of row, contains new rows > ArrayData data = row.getArray(0); > // "old" field is an array of row, contains old values > ArrayData old = row.getArray(1); > for (int i = 0; i < data.size(); i++) { > // the underlying JSON deserialization schema always produce > GenericRowData. > GenericRowData after = (GenericRowData) data.getRow(i, fieldCount); > GenericRowData before = (GenericRowData) old.getRow(i, fieldCount); > for (int f = 0; f < fieldCount; f++) { > if (before.isNullAt(f)) { > // not null fields in "old" (before) means the fields are changed > // null/empty fields in "old" (before) means the fields are not > changed > // so we just copy the not changed fields into before > before.setField(f, after.getField(f)); > } > } > before.setRowKind(RowKind.UPDATE_BEFORE); > after.setRowKind(RowKind.UPDATE_AFTER); > out.collect(before); > out.collect(after); > {code} > When the old field has null value, it will be overwrite by the new record > value. That lead the aggregation to a wrong result. > > I tried to fix this bug with following logic. > For each field. Use old value when old row has this field whether it is null > or nut, Use new value by default. > I hope this bug will be fixed on the future version. > [^canal_format.patch] > -- This message was sent by Atlassian Jira (v8.3.4#803005)