抱歉... 题目没有看仔细,才发现你说的是 es sink,那和我上面说的 bug 不是一个问题。

不过从理论分析,不应该出现这个现象。
我在本地1.11分支上,用你给的数据和 sql,也没有复现你说的问题。
是不是 sql 给的不对?我看你 test_status 表的定义在 pk 之前少了一个逗号..

Best,
Jark

On Sat, 14 Nov 2020 at 17:48, Jark Wu <[email protected]> wrote:

> 看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423
> 这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。
> 这个bug 会在即将发布的 1.11.3 中修复。
>
> Best,
> Jark
>
>
>
>
> On Fri, 13 Nov 2020 at 13:12, jindy_liu <[email protected]> wrote:
>
>> 源表test:
>> CREATE TABLE test (
>> `id` INT,
>> `name` VARCHAR(255),
>> `time` TIMESTAMP(3),
>> `status` INT,
>> PRIMARY KEY(id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'mysql-cdc',
>>   'hostname' = 'localhost',
>>   'port' = '3306',
>>   'username' = 'root',
>>   'password' = '1',
>>   'database-name' = 'ai_audio_lyric_task',
>>   'table-name' = 'test'
>> )
>> 源表status
>> CREATE TABLE status (
>> `id` INT,
>> `name` VARCHAR(255),
>> PRIMARY KEY(id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'mysql-cdc',
>>   'hostname' = 'localhost',
>>   'port' = '3306',
>>   'username' = 'root',
>>   'password' = '1',
>>   'database-name' = 'ai_audio_lyric_task',
>>   'table-name' = 'status'
>> );
>>
>> 输出表
>> CREATE TABLE test_status (
>> `id` INT,
>> `name` VARCHAR(255),
>> `time` TIMESTAMP(3),
>> `status` INT,
>> `status_name` VARCHAR(255)
>> PRIMARY KEY(id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'xxx',
>>   'index' = 'xxx',
>>   'username' = 'xxx',
>>   'password' = 'xxx',
>>   'sink.bulk-flush.backoff.max-retries' = '100000',
>>   'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
>>   'sink.bulk-flush.max-actions' = '5000',
>>   'sink.bulk-flush.max-size' = '10mb',
>>   'sink.bulk-flush.interval' = '1s'
>> );
>>
>>
>> 输出语句:
>> INSERT into test_status
>> SELECT t.*, s.name
>> FROM test AS t
>> LEFT JOIN status AS s ON t.status = s.id;
>>
>> mysql表中已经有数据
>> test:
>> 0, name0, 2020-07-06 00:00:00 , 0
>> 1, name1, 2020-07-06 00:00:00 , 1
>> 2, name2, 2020-07-06 00:00:00 , 1
>> .....
>>
>> status
>> 0, status0
>> 1, status1
>> 2, status2
>> .....
>>
>> 操作顺序与复现:
>> 1、启动任务,设置并行度为40,
>> 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
>> savepoint保存,然后web ui上取消任务。
>>   ==> test_status中的数据正常:
>>     0, name0, 2020-07-06 00:00:00 , 0, status0
>>     1, name1, 2020-07-06 00:00:00 , 1, status1
>>     2, name2, 2020-07-06 00:00:00 , 1, status1
>>
>> 2、操作mysql, 将status中id=1数据变更为 status1_modify
>>
>> 3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
>> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
>> job  下,
>>   ==> test_status中的数据正常:
>>     0, name0, 2020-07-06 00:00:00 , 0, status0
>>     1, name1, 2020-07-06 00:00:00 , 1, status1_modify
>>     2, name2, 2020-07-06 00:00:00 , 1, status1_modify
>> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
>> job  下
>>   ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
>>     0, name0, 2020-07-06 00:00:00 , 0, status0
>>
>>
>> 怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!
>>
>> 这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
>> 如果是,能不能在sink的时候,只把sink这里的并行度设置为1??
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>

回复