Hi Francis,

I think requiring primary for versioned table[1] used in temporarl join[2] 
should be
expected. May I have a double confirmation that which table serves as the 
versioned
table in this case? Is it the streaming table from the rabbitmq or the joined 
data?

Best,
Yun



[1]  
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/versioned_tables/
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins




 ------------------Original Mail ------------------
Sender:Francis Conroy <francis.con...@switchdin.com>
Send Date:Thu Feb 17 11:27:01 2022
Recipients:user <user@flink.apache.org>
Subject:Flink 1.15 deduplication view and lookup join

Hi user group, 

I'm using flink 1.15 currently (we're waiting for it to be released) to build 
up some streaming pipelines and I'm trying to do a temporal lookup join.

I've got several tables(all with primary keys) defined which are populated by 
Debezium CDC data, let's call them a, b and c.

I've defined a view which joins all three tables to give some hierarchical 
association data rows like in the diagram.

This all works fine so far.
I'm trying to join this table with a table from a datastream, using a lookup 
join 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join)
 like follows:

I've added a time field to both tables now and I'm getting the following 
validation exception:
Temporal Table Join requires primary key in versioned table, but no primary key 
can be found.

 I went and implemented another view on the joined data which implemented the 
deduplication query 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication/#deduplication)
 

Here is my view definition: 
CREATE VIEW versioned_endpoint_association AS
SELECT device_id,
       leg_dt_id,
       ldt_id,
       ep_uuid,
       unit_uuid,
       pf_uuid,
       update_time
FROM (
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY device_id
           ORDER BY update_time DESC) as rownum
      FROM endpoint_association)
WHERE rownum = 1;
After taking all steps I cannot get the temporal join to work, am I missing 
some detail which will tell flink that versioned_endpoint_association should 
in-fact be interpreted as a versioned table?

Looking at the log it's important that there is a LogicalRank node which can 
convert to a Deduplicate node, but the conversion isn't happening. 




This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email and 
delete it from your system. You may not use, disseminate, distribute or copy 
this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia

Reply via email to