[ 
https://issues.apache.org/jira/browse/FLINK-19452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17204488#comment-17204488
 ] 

Zhengchao Shi edited comment on FLINK-19452 at 9/30/20, 6:43 AM:
-----------------------------------------------------------------

[~jark] Because I started the job from the `latest-offset`, that is to say, 
there is no CDC state in flink(see in 
'org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction'),  when 
the update event is received, the count statistics will be balanced between 1 
and 0, because there are only -U and +U messages。like this:
first updates: update orders set product_id = 101 where order_number = 10001 
and product_id = 100;
-U(10001,100) -> ignore ,because no state, and it is a retract message, so just 
return in GroupAggFunction#processElement
+U(10001,  101) -> +I(10001, 1)

second updates: update orders set product_id = 102 where order_number = 10001 
and product_id = 105;
-U(10001,105) -> -D(10001,1)
+U(10001,  102) ->  +I(10001, 1)

the above is why the result is always 1.


was (Author: tinny):
[~jark] Because I started the job from the `latest-offset`, that is to say, 
there is no CDC state in flink(see in 
'org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction'),  when 
the update event is received, the count statistics will be balanced between 1 
and 0, because there are only -U and +U messages。like this:
first updates: update orders set product_id = 101 where order_number = 10001 
and product_id = 100;
-U(10001,100) -> ignore ,because no state, and it is a retract message, so just 
return in GroupAggFunction#processElement
+U(10001,  101) -> +I(10001, 1)

second updates: update orders set product_id = 102 where order_number = 10001 
and product_id = 105;
-U(10001,105) -> -D(10001,1)
+U(10001,  102) ->  +I(10001, 1)

> statistics of group by CDC data is always 1
> -------------------------------------------
>
>                 Key: FLINK-19452
>                 URL: https://issues.apache.org/jira/browse/FLINK-19452
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.11.1
>            Reporter: Zhengchao Shi
>            Priority: Major
>             Fix For: 1.12.0
>
>
> When using CDC to do count statistics, if only updates are made to the source 
> table(mysql table), then the value of count is always 1.
> {code:sql}
> CREATE TABLE orders (
>   order_number int,
>   product_id   int
> ) with (
>   'connector' = 'kafka-0.11',
>   'topic' = 'Topic',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'GroupId',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json'
> );
> CREATE TABLE order_test (
>   order_number int,
>   order_cnt bigint
> ) WITH (
>   'connector' = 'print'
> );
> INSERT INTO order_test
> SELECT order_number, count(1) FROM orders GROUP BY order_number;
> {code}
> 3 records in  “orders” :
> ||order_number||product_id||
> |10001|1|
> |10001|2|
> |10001|3|
>  now update orders table:
> {code:sql}
> update orders set product_id = 5 where order_number = 10001;
> {code}
> the output of is :
> -D(10001,1)
>  +I(10001,1)
>  -D(10001,1)
>  +I(10001,1)
>  -D(10001,1)
>  +I(10001,1)
> i think, the final result is +I(10001, 3)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to