The view A try to  do de-duplication using event time,  which will still 
produce update rows.  if you using proc time  to do de-duplication.Then the 
view A should only produce append only rows.

Best regards,
Yuxia



> 2022年10月15日 上午9:50,liebin...@whu.edu.cn 写道:
> 
> I had a problem with Interval Join after using Deduplicate. I'm using Flink 
> version 1.15.
> 
> I want to use Flink's Interval Join for double-stream association, and my 
> first table needs to be de-duplicated. Here is my sample code.
> 
> ```
> CREATE TEMPORARY TABLE `source` (
>  id INT,
>  name STRING,
>  event_time TIMESTAMP(3),
>  WATERMARK FOR event_time AS event_time
> ) WITH (
>  'connector' = 'datagen'
> );
> 
> 
> CREATE TEMPORARY TABLE B (
>  id INT,
>  `start` INT,
>  `end` INT,
>  event_time TIMESTAMP(3),
>  WATERMARK FOR event_time AS event_time
> ) WITH (
>  'connector' = 'datagen'
> );
> 
> create TEMPORARY view A as
> select id, name, event_time from (
>  select id, name, event_time,
>  row_number() over(partition by id, name, event_time order by event_time asc) 
> as rn
>  from source
> )
> where rn = 1;
> 
> SELECT *
> FROM A, B
> WHERE 
>    A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND 
>    A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND 
>    B.event_time + INTERVAL '10' SECOND;
> ```
> 
> I used to preserve the first row of data for the de-duplication, so view A 
> should only produce insert rows, but running the SQL above would produce the 
> following error.
> 
> ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't 
> support consuming update and delete changes which is produced by node 
> Deduplicate(keep=[FirstRow], key=[id, name, event_time], order=[ROWTIME])
> ```
> 
> How to perform Interval Join after using Deduplicate?

Reply via email to