1、试了下
在test表中增加一个proctime
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`proctime` AS PROCTIME(),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'no_lock',
'password' = 'no_lock',
'database-name' = 'ai_audio_lyric_task',
'table-name' = 'test',
'debezium.snapshot.locking.mode' = 'none'
);
写去重语句,
INSERT into test_status_print
SELECT r.id, r.name, r.`time`, r.`proctime`, r.status, r.status_name
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime) as
rowNum
FROM (
SELECT t.* , s.name as status_name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id
)
)r WHERE rowNum = 1;
但提示报错,不支持:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Deduplicate doesn't support
consuming update and delete changes which is produced by node
Join(joinType=[LeftOuterJoin], where=[(status = id0)], select=[id, name,
time, status, proctime, id0, name0], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])
--
Sent from: http://apache-flink.147419.n8.nabble.com/