Dear List,
I have trouble implementing a join between two streaming tables in Python Table
API.
The left table of my join should be enriched with the information with the last
value of the right_table. The right_table is updated only rarely (maybe after
15 minutes). When implementing the join I get only updates when the right table
changes. I want to trigger the updates for the joined table every time when I
receive a record on the left side. The record should be enriched with the most
recent result of the right side. I have not found a way to implement with the
desired result.
It tried an implementation using a versioned view. Here is a short example:
left_table
root
|-- measurement_time: TIMESTAMP(3) *ROWTIME*
|-- x: DOUBLE
|-- y: DOUBLE
|-- proctime: TIMESTAMP(3) NOT NULL *PROCTIME* AS PROCTIME()
|-- WATERMARK FOR measurement_time: TIMESTAMP(3) AS `measurement_time`
right_table
|-- some_value: INT
|-- id: INT
|-- modtime: TIMESTAMP(3) *ROWTIME*
The "id" is always defined as 1.
I perform the following operations
t_env.create_temporary_view("left_table", left_table.add_columns("1.cast(INT)
AS left_artificial_key"))
t_env.create_temporary_view("right_table", right_table)
sql_view = """
-- Define a versioned view
CREATE VIEW versioned_right AS
SELECT id, some_value, modtime
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id
ORDER BY modtime DESC) AS rownum
FROM right_table)
WHERE rownum = 1
"""
view = t_env.execute_sql(sql_view)
sql = """
SELECT
left_table.*, versioned_right.some_value
FROM left_table
LEFT JOIN versioned_right FOR SYSTEM_TIME AS OF
left_table.measurement_time
ON abt.left_artificial_key = versioned_right.id
"""
joined = t_env.sql_query(sql)
I observed the same behavior when using a lateral join.
Does anybody have an idea how the join could be implemented in the correct way?
Any comments or ideas are very welcome.
Thanks
Torben
Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042,
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen;
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr.
Pierre Dominique Prümm, Dr. Matthias Zieschang