[ https://issues.apache.org/jira/browse/FLINK-19452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17204488#comment-17204488 ]
Zhengchao Shi edited comment on FLINK-19452 at 9/30/20, 6:40 AM: ----------------------------------------------------------------- [~jark] Because I started the job from the `latest-offset`, that is to say, there is no CDC state in flink(see in 'org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction'), when the update event is received, the count statistics will be balanced between 1 and 0, because there are only -U and +U messages。like this: first updates: update orders set product_id = 101 where order_number = 10001 and product_id = 100; -U(10001,100) -> ignore ,because no state, and it is a retract message, so just return in GroupAggFunction#processElement +U(10001, 101) -> +I(10001, 1) second updates: update orders set product_id = 102 where order_number = 10001 and product_id = 105; -U(10001,105) -> -D(10001,1) +U(10001, 102) -> +I(10001, 1) was (Author: tinny): [~jark] Because I started the job from the `latest-offset`, that is to say, there is no CDC state in flink(see in 'org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction'), when the update event is received, the count statistics will be balanced between 1 and 0, because there are only -U and +U messages。like this: first updates: update orders set product_id = 101 where order_number = 10001 and product_id = 100; -U(10001,1) -> ignore ,because no state, and it is a retract message, so just return in GroupAggFunction#processElement +U(10001, 2 -> +I(10001, 1) second updates: update orders set product_id = 102 where order_number = 10001 and product_id = 105; -U(10001,1) -> -D(10001,1) +U(10001, 2 -> +I(10001, 1) > statistics of group by CDC data is always 1 > ------------------------------------------- > > Key: FLINK-19452 > URL: https://issues.apache.org/jira/browse/FLINK-19452 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Affects Versions: 1.11.1 > Reporter: Zhengchao Shi > Priority: Major > Fix For: 1.12.0 > > > When using CDC to do count statistics, if only updates are made to the source > table(mysql table), then the value of count is always 1. > {code:sql} > CREATE TABLE orders ( > order_number int, > product_id int > ) with ( > 'connector' = 'kafka-0.11', > 'topic' = 'Topic', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'GroupId', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'canal-json' > ); > CREATE TABLE order_test ( > order_number int, > order_cnt bigint > ) WITH ( > 'connector' = 'print' > ); > INSERT INTO order_test > SELECT order_number, count(1) FROM orders GROUP BY order_number; > {code} > 3 records in “orders” : > ||order_number||product_id|| > |10001|1| > |10001|2| > |10001|3| > now update orders table: > {code:sql} > update orders set product_id = 5 where order_number = 10001; > {code} > the output of is : > -D(10001,1) > +I(10001,1) > -D(10001,1) > +I(10001,1) > -D(10001,1) > +I(10001,1) > i think, the final result is +I(10001, 3) -- This message was sent by Atlassian Jira (v8.3.4#803005)