[ https://issues.apache.org/jira/browse/FLINK-20374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timo Walther updated FLINK-20374: --------------------------------- Labels: (was: stale-critical) > Wrong result when shuffling changelog stream on non-primary-key columns > ----------------------------------------------------------------------- > > Key: FLINK-20374 > URL: https://issues.apache.org/jira/browse/FLINK-20374 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Jark Wu > Priority: Critical > Fix For: 1.14.0 > > > This is reported from user-zh ML: > http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html > {code:sql} > CREATE TABLE test ( > `id` INT, > `name` VARCHAR(255), > `time` TIMESTAMP(3), > `status` INT, > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'root', > 'password' = '1', > 'database-name' = 'ai_audio_lyric_task', > 'table-name' = 'test' > ) > CREATE TABLE status ( > `id` INT, > `name` VARCHAR(255), > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'root', > 'password' = '1', > 'database-name' = 'ai_audio_lyric_task', > 'table-name' = 'status' > ); > -- output > CREATE TABLE test_status ( > `id` INT, > `name` VARCHAR(255), > `time` TIMESTAMP(3), > `status` INT, > `status_name` VARCHAR(255) > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'elasticsearch-7', > 'hosts' = 'xxx', > 'index' = 'xxx', > 'username' = 'xxx', > 'password' = 'xxx', > 'sink.bulk-flush.backoff.max-retries' = '100000', > 'sink.bulk-flush.backoff.strategy' = 'CONSTANT', > 'sink.bulk-flush.max-actions' = '5000', > 'sink.bulk-flush.max-size' = '10mb', > 'sink.bulk-flush.interval' = '1s' > ); > INSERT into test_status > SELECT t.*, s.name > FROM test AS t > LEFT JOIN status AS s ON t.status = s.id; > {code} > Data in mysql table: > {code} > test: > 0, name0, 2020-07-06 00:00:00 , 0 > 1, name1, 2020-07-06 00:00:00 , 1 > 2, name2, 2020-07-06 00:00:00 , 1 > ..... > status > 0, status0 > 1, status1 > 2, status2 > ..... > {code} > Operations: > 1. start job with paralleslim=40, result in test_status sink is correct: > {code} > 0, name0, 2020-07-06 00:00:00 , 0, status0 > 1, name1, 2020-07-06 00:00:00 , 1, status1 > 2, name2, 2020-07-06 00:00:00 , 1, status1 > {code} > 2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}. > 3. Result is not correct because the {{id=2}} record is missing in the > result. > The reason is that it shuffles the changelog {{test}} on {{status}} column > which is not the primary key. Therefore, the ordering can't be guaranteed, > and the result is wrong. > The {{-U[2, name2, 2020-07-06 00:00:00 , 1]}} and {{+U[2, name2, 2020-07-06 > 00:00:00 , 2]}} will possible be shuffled to different join task, so the > order of joined results is not guaranteed when they arrive to the sink task. > It is possbile {{+U[2, name2, 2020-07-06 00:00:00 , status2]}} arrives > first, and then {{-U[2, name2, 2020-07-06 00:00:00 , status1]}} , then the > {{id=2}} record is missing in Elasticsearch. > It seems that we need a changelog ordering mechanism in the planner. -- This message was sent by Atlassian Jira (v8.3.4#803005)