抱歉... 题目没有看仔细,才发现你说的是 es sink,那和我上面说的 bug 不是一个问题。
不过从理论分析,不应该出现这个现象。 我在本地1.11分支上,用你给的数据和 sql,也没有复现你说的问题。 是不是 sql 给的不对?我看你 test_status 表的定义在 pk 之前少了一个逗号.. Best, Jark On Sat, 14 Nov 2020 at 17:48, Jark Wu <[email protected]> wrote: > 看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423 > 这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。 > 这个bug 会在即将发布的 1.11.3 中修复。 > > Best, > Jark > > > > > On Fri, 13 Nov 2020 at 13:12, jindy_liu <[email protected]> wrote: > >> 源表test: >> CREATE TABLE test ( >> `id` INT, >> `name` VARCHAR(255), >> `time` TIMESTAMP(3), >> `status` INT, >> PRIMARY KEY(id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'mysql-cdc', >> 'hostname' = 'localhost', >> 'port' = '3306', >> 'username' = 'root', >> 'password' = '1', >> 'database-name' = 'ai_audio_lyric_task', >> 'table-name' = 'test' >> ) >> 源表status >> CREATE TABLE status ( >> `id` INT, >> `name` VARCHAR(255), >> PRIMARY KEY(id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'mysql-cdc', >> 'hostname' = 'localhost', >> 'port' = '3306', >> 'username' = 'root', >> 'password' = '1', >> 'database-name' = 'ai_audio_lyric_task', >> 'table-name' = 'status' >> ); >> >> 输出表 >> CREATE TABLE test_status ( >> `id` INT, >> `name` VARCHAR(255), >> `time` TIMESTAMP(3), >> `status` INT, >> `status_name` VARCHAR(255) >> PRIMARY KEY(id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'elasticsearch-7', >> 'hosts' = 'xxx', >> 'index' = 'xxx', >> 'username' = 'xxx', >> 'password' = 'xxx', >> 'sink.bulk-flush.backoff.max-retries' = '100000', >> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT', >> 'sink.bulk-flush.max-actions' = '5000', >> 'sink.bulk-flush.max-size' = '10mb', >> 'sink.bulk-flush.interval' = '1s' >> ); >> >> >> 输出语句: >> INSERT into test_status >> SELECT t.*, s.name >> FROM test AS t >> LEFT JOIN status AS s ON t.status = s.id; >> >> mysql表中已经有数据 >> test: >> 0, name0, 2020-07-06 00:00:00 , 0 >> 1, name1, 2020-07-06 00:00:00 , 1 >> 2, name2, 2020-07-06 00:00:00 , 1 >> ..... >> >> status >> 0, status0 >> 1, status1 >> 2, status2 >> ..... >> >> 操作顺序与复现: >> 1、启动任务,设置并行度为40, >> 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink >> savepoint保存,然后web ui上取消任务。 >> ==> test_status中的数据正常: >> 0, name0, 2020-07-06 00:00:00 , 0, status0 >> 1, name1, 2020-07-06 00:00:00 , 1, status1 >> 2, name2, 2020-07-06 00:00:00 , 1, status1 >> >> 2、操作mysql, 将status中id=1数据变更为 status1_modify >> >> 3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。 >> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink -s savepoint -p 1 >> job 下, >> ==> test_status中的数据正常: >> 0, name0, 2020-07-06 00:00:00 , 0, status0 >> 1, name1, 2020-07-06 00:00:00 , 1, status1_modify >> 2, name2, 2020-07-06 00:00:00 , 1, status1_modify >> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink -s savepoint -p 40 >> job 下 >> ==> test_status中的数据不正常, id = 1,2的两条数据缺失: >> 0, name0, 2020-07-06 00:00:00 , 0, status0 >> >> >> 怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!! >> >> 这里是不是bug?还是从save point里恢复的时候,算子的状态有问题? >> 如果是,能不能在sink的时候,只把sink这里的并行度设置为1?? >> >> >> >> >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> >
