Jark Wu created FLINK-20374: ------------------------------- Summary: Wrong result when shuffling changelog stream on non-primary-key columns Key: FLINK-20374 URL: https://issues.apache.org/jira/browse/FLINK-20374 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Jark Wu
This is reported from user-zh ML: http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html {code:sql} CREATE TABLE test ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'test' ) CREATE TABLE status ( `id` INT, `name` VARCHAR(255), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'status' ); -- output CREATE TABLE test_status ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, `status_name` VARCHAR(255) PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'xxx', 'index' = 'xxx', 'username' = 'xxx', 'password' = 'xxx', 'sink.bulk-flush.backoff.max-retries' = '100000', 'sink.bulk-flush.backoff.strategy' = 'CONSTANT', 'sink.bulk-flush.max-actions' = '5000', 'sink.bulk-flush.max-size' = '10mb', 'sink.bulk-flush.interval' = '1s' ); INSERT into test_status SELECT t.*, s.name FROM test AS t LEFT JOIN status AS s ON t.status = s.id; {code} Data in mysql table: {code} test: 0, name0, 2020-07-06 00:00:00 , 0 1, name1, 2020-07-06 00:00:00 , 1 2, name2, 2020-07-06 00:00:00 , 1 ..... status 0, status0 1, status1 2, status2 ..... {code} Operations: 1. start job with paralleslim=40, result in test_status sink is correct: {code} 0, name0, 2020-07-06 00:00:00 , 0, status0 1, name1, 2020-07-06 00:00:00 , 1, status1 2, name2, 2020-07-06 00:00:00 , 1, status1 {code} 2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}. 3. Result is not correct because the {{id=2}} record is missing in the result. The reason is that it shuffles the changelog {{test}} on {{status}} column which is not the primary key. Therefore, the ordering can't be guaranteed, and the result is wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)