Hi, I'm trying to use temporal join in Table API to enrich a stream of pageview events with a slowly changing dimension of user information. The pageview events are in a kafka topic called *pageviews* and the user information are in a kafka topic keyed by *userid* and whenever there is an updated user event, it is appended to the *users* topic. I declare a table on the pageview topic with watermark strategy of *WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))* and a table on the users topic with watermark strategy of * WatermarkStrategy.forMonotonousTimestamps().*
Here is the code for the temporal join: Table pv = getPageview(env, tableEnv, properties). select( $("timestamp").as("pv_ts"), $("userid").as("pv_userid"), $("pageid").as("pv_pageid") ); Table usr = getUsers(env, tableEnv, properties) .select( $("timestamp").as("u_ts"), $("userid").as("u_userid"), $("regionid"), $("gender") ); TemporalTableFunction userFunction = usr.createTemporalTableFunction($("u_ts"), $("u_userid")); tableEnv.createTemporaryFunction("usrFun", userFunction); Table enrichedPV = pv.joinLateral(call("usrFun", $("pv_ts")), $("pv_userid").isEqual($("u_userid"))); enrichedPV.execute().print(); When I run this, I get result like the following which only is triggered when there are new messages pushed into both pageviews and users topics: +----+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+ | op | pv_ts | pv_userid | pv_pageid | u_ts | u_userid | regionid | gender | +----+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+ | +I | 2021-09-22 08:28:05.346 | User_8 | Page_99 | 2021-09-22 08:28:04.769 | User_8 | Region_1 | OTHER | | +I | 2021-09-22 08:28:12.377 | User_3 | Page_88 | 2021-09-22 08:28:08.823 | User_3 | Region_8 | FEMALE | | +I | 2021-09-22 08:28:15.385 | User_7 | Page_73 | 2021-09-22 08:28:07.817 | User_7 | Region_9 | OTHER | | +I | 2021-09-22 08:28:16.391 | User_7 | Page_97 | 2021-09-22 08:28:07.817 | User_7 | Region_9 | OTHER | | +I | 2021-09-22 08:28:17.396 | User_7 | Page_43 | 2021-09-22 08:28:07.817 | User_7 | Region_9 | OTHER | | +I | 2021-09-22 08:28:18.400 | User_6 | Page_43 | 2021-09-22 08:28:15.854 | User_6 | Region_5 | OTHER | However, I want to trigger a result whenever a new pageview message arrives and not wait on the user side. Do I have any obvious mistake in my code that I cannot get this behavior? Also is there any code example that I can try where the main stream is enriched when there is a new event regardless of having any new event in the dimension side? Flink documentation on temporal join especially for TableAPI is really thin! Thanks in advance.