问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
String kafka = "CREATE TABLE `电话` (    `rowid` 
VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` 
VARCHAR(2147483647),`63fd660536521f81a2cfabad` 
VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH ( 'connector' 
= 'kafka', 'topic' = 
'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f', 
'properties.bootstrap.servers' = '132.232.27.116:9092', 'scan.startup.mode' = 
'earliest-offset', 'format' = 'debezium-json' )";

String mysql = "CREATE TABLE `电话_1` (    `rowID` VARCHAR(255),`名称` STRING,`手机` 
STRING,`座机` VARCHAR(255),    PRIMARY KEY (`rowID`) NOT ENFORCED  )  WITH (    
'connector' = 'jdbc',    'driver' = 'com.mysql.cj.jdbc.Driver',    'url' = 
'jdbc:mysql://43.136.128.102:6506/meihua_test',    'username' = 'root',    
'password' = '123456',    'table-name' = '电话2'  )";

String insert = "insert into `电话_1` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` 
as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from ( select `rowid` as 
`rowID`,`63fd65fb36521f81a2cfab90` as `名称`,`63fd660536521f81a2cfabad` as 
`手机`,`63fd660536521f81a2cfabae` as `座机` from `电话` ) as t_1";

操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?

回复