Hi! What type of time attribute is u_ts? If it is an event time attribute then this query you're running is an event time temporal table join, which will pause outputting records until the watermark from both inputs has risen above the row time of that record.
As the dimension table is changing quite slowly, I would recommend you using the processing time temporal table join (or the so called lookup table join) instead. See this example from the scala API unit test: val proctimeOrders: Table = util.addDataStream[(Long, String)]( "ProctimeOrders", 'o_amount, 'o_currency, 'o_proctime.proctime) val proctimeRatesHistory: Table = util.addDataStream[(String, Int)]( "ProctimeRatesHistory", 'currency, 'rate, 'proctime.proctime) The documentation of Table API indeed lacks quite a lot of information. I would recommend you to try out the SQL API instead, which is the super set of Table API and will more expressive and easier to understand. John Smith <mylearningemail2...@gmail.com> 于2021年9月22日周三 下午4:45写道: > Hi, > > I'm trying to use temporal join in Table API to enrich a stream of > pageview events with a slowly changing dimension of user information. > The pageview events are in a kafka topic called *pageviews* and the user > information are in a kafka topic keyed by *userid* and whenever there is > an updated user event, it is appended to the *users* topic. > I declare a table on the pageview topic with watermark strategy of > *WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))* and a > table on the users topic with watermark strategy of > * WatermarkStrategy.forMonotonousTimestamps().* > > Here is the code for the temporal join: > > Table pv = getPageview(env, tableEnv, properties). > select( > $("timestamp").as("pv_ts"), > $("userid").as("pv_userid"), > $("pageid").as("pv_pageid") > ); > Table usr = getUsers(env, tableEnv, properties) > .select( > $("timestamp").as("u_ts"), > $("userid").as("u_userid"), > $("regionid"), > $("gender") > ); > > TemporalTableFunction userFunction = > usr.createTemporalTableFunction($("u_ts"), $("u_userid")); > tableEnv.createTemporaryFunction("usrFun", userFunction); > > Table enrichedPV = pv.joinLateral(call("usrFun", $("pv_ts")), > $("pv_userid").isEqual($("u_userid"))); > > enrichedPV.execute().print(); > > When I run this, I get result like the following which only is triggered > when there are new messages pushed into both pageviews and users topics: > > > +----+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+ > | op | pv_ts | pv_userid | > pv_pageid | u_ts | > u_userid | regionid | gender | > > +----+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+ > | +I | 2021-09-22 08:28:05.346 | User_8 | > Page_99 | 2021-09-22 08:28:04.769 | > User_8 | Region_1 | OTHER | > | +I | 2021-09-22 08:28:12.377 | User_3 | > Page_88 | 2021-09-22 08:28:08.823 | > User_3 | Region_8 | FEMALE | > | +I | 2021-09-22 08:28:15.385 | User_7 | > Page_73 | 2021-09-22 08:28:07.817 | > User_7 | Region_9 | OTHER | > | +I | 2021-09-22 08:28:16.391 | User_7 | > Page_97 | 2021-09-22 08:28:07.817 | > User_7 | Region_9 | OTHER | > | +I | 2021-09-22 08:28:17.396 | User_7 | > Page_43 | 2021-09-22 08:28:07.817 | > User_7 | Region_9 | OTHER | > | +I | 2021-09-22 08:28:18.400 | User_6 | > Page_43 | 2021-09-22 08:28:15.854 | > User_6 | Region_5 | OTHER | > > However, I want to trigger a result whenever a new pageview message > arrives and not wait on the user side. > Do I have any obvious mistake in my code that I cannot get this behavior? > Also is there any code example that I can try where the main stream is > enriched when there is a new event regardless of having any new event in > the dimension side? Flink documentation on temporal join especially for > TableAPI is really thin! > > Thanks in advance. > > > > >