1、试了下

在test表中增加一个proctime 

CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`proctime` AS PROCTIME(),
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'no_lock',
  'password' = 'no_lock',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test',
  'debezium.snapshot.locking.mode' = 'none'
);

写去重语句,

INSERT into test_status_print 
SELECT r.id, r.name, r.`time`, r.`proctime`, r.status, r.status_name
FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime) as 
rowNum
        FROM (
                SELECT t.* , s.name as status_name
                FROM test AS t
                LEFT JOIN status AS s ON t.status = s.id
        )
)r WHERE rowNum = 1;

但提示报错,不支持:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Deduplicate doesn't support
consuming update and delete changes which is produced by node
Join(joinType=[LeftOuterJoin], where=[(status = id0)], select=[id, name,
time, status, proctime, id0, name0], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复