Dear List,

I have trouble implementing a join between two streaming tables in Python Table 
API.

The left table of my join should be enriched with the information with the last 
value of the right_table. The right_table is updated only rarely (maybe after 
15 minutes). When implementing the join I get only updates when the right table 
changes. I want to trigger the updates for the joined table every time when I 
receive a record on the left side. The record should be enriched with the most 
recent result of the right side. I have not found a way to implement with the 
desired result.

It tried an implementation using a versioned view. Here is a short example:

left_table
root
|-- measurement_time: TIMESTAMP(3) *ROWTIME*
|-- x: DOUBLE
|-- y: DOUBLE
|-- proctime: TIMESTAMP(3) NOT NULL *PROCTIME* AS PROCTIME()
|-- WATERMARK FOR measurement_time: TIMESTAMP(3) AS `measurement_time`

right_table
|-- some_value: INT
|-- id: INT
|-- modtime: TIMESTAMP(3) *ROWTIME*
 The "id" is always defined as 1.
 I perform the following operations

t_env.create_temporary_view("left_table", left_table.add_columns("1.cast(INT) 
AS left_artificial_key"))
t_env.create_temporary_view("right_table", right_table)

sql_view = """
-- Define a versioned view
                CREATE VIEW versioned_right AS
                SELECT id, some_value, modtime
                  FROM (
                                 SELECT *,
                                 ROW_NUMBER() OVER (PARTITION BY id
                                                ORDER BY modtime DESC) AS rownum
                                 FROM right_table)
                WHERE rownum = 1
"""

view = t_env.execute_sql(sql_view)

sql = """
                               SELECT
                               left_table.*, versioned_right.some_value
                FROM left_table
                LEFT JOIN versioned_right FOR SYSTEM_TIME AS OF 
left_table.measurement_time
                ON abt.left_artificial_key = versioned_right.id
"""

joined = t_env.sql_query(sql)


I observed the same behavior when using a lateral join.

Does anybody have an idea how the join could be implemented in the correct way?

Any comments or ideas are very welcome.

Thanks
Torben

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang

Reply via email to