The view A try to do de-duplication using event time, which will still produce update rows. if you using proc time to do de-duplication.Then the view A should only produce append only rows.
Best regards, Yuxia > 2022年10月15日 上午9:50,liebin...@whu.edu.cn 写道: > > I had a problem with Interval Join after using Deduplicate. I'm using Flink > version 1.15. > > I want to use Flink's Interval Join for double-stream association, and my > first table needs to be de-duplicated. Here is my sample code. > > ``` > CREATE TEMPORARY TABLE `source` ( > id INT, > name STRING, > event_time TIMESTAMP(3), > WATERMARK FOR event_time AS event_time > ) WITH ( > 'connector' = 'datagen' > ); > > > CREATE TEMPORARY TABLE B ( > id INT, > `start` INT, > `end` INT, > event_time TIMESTAMP(3), > WATERMARK FOR event_time AS event_time > ) WITH ( > 'connector' = 'datagen' > ); > > create TEMPORARY view A as > select id, name, event_time from ( > select id, name, event_time, > row_number() over(partition by id, name, event_time order by event_time asc) > as rn > from source > ) > where rn = 1; > > SELECT * > FROM A, B > WHERE > A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND > A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND > B.event_time + INTERVAL '10' SECOND; > ``` > > I used to preserve the first row of data for the de-duplication, so view A > should only produce insert rows, but running the SQL above would produce the > following error. > > ``` > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't > support consuming update and delete changes which is produced by node > Deduplicate(keep=[FirstRow], key=[id, name, event_time], order=[ROWTIME]) > ``` > > How to perform Interval Join after using Deduplicate?