Hi,

I'm trying to use temporal join in Table API to enrich a stream of pageview
events with a slowly changing dimension of user information.
The pageview events are in a kafka topic called *pageviews* and the user
information are in a kafka topic keyed by *userid* and whenever there is an
updated user event, it is appended to the *users* topic.
I declare a table on the pageview topic with watermark strategy of
*WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))* and a
table on the users topic with watermark strategy of
* WatermarkStrategy.forMonotonousTimestamps().*

Here is the code for the temporal join:

Table pv = getPageview(env, tableEnv, properties).
        select(
        $("timestamp").as("pv_ts"),
        $("userid").as("pv_userid"),
        $("pageid").as("pv_pageid")
);
Table usr = getUsers(env, tableEnv, properties)
        .select(
        $("timestamp").as("u_ts"),
        $("userid").as("u_userid"),
        $("regionid"),
        $("gender")
);

TemporalTableFunction userFunction =
usr.createTemporalTableFunction($("u_ts"), $("u_userid"));
tableEnv.createTemporaryFunction("usrFun", userFunction);

Table enrichedPV = pv.joinLateral(call("usrFun", $("pv_ts")),
$("pv_userid").isEqual($("u_userid")));

enrichedPV.execute().print();

When I run this, I get result like the following which only is triggered
when there are new messages pushed into both pageviews and users topics:

+----+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                   pv_ts |                      pv_userid |
           pv_pageid |                    u_ts |
u_userid |                       regionid |                         gender |
+----+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 2021-09-22 08:28:05.346 |                         User_8 |
             Page_99 | 2021-09-22 08:28:04.769 |
User_8 |                       Region_1 |                          OTHER |
| +I | 2021-09-22 08:28:12.377 |                         User_3 |
             Page_88 | 2021-09-22 08:28:08.823 |
User_3 |                       Region_8 |                         FEMALE |
| +I | 2021-09-22 08:28:15.385 |                         User_7 |
             Page_73 | 2021-09-22 08:28:07.817 |
User_7 |                       Region_9 |                          OTHER |
| +I | 2021-09-22 08:28:16.391 |                         User_7 |
             Page_97 | 2021-09-22 08:28:07.817 |
User_7 |                       Region_9 |                          OTHER |
| +I | 2021-09-22 08:28:17.396 |                         User_7 |
             Page_43 | 2021-09-22 08:28:07.817 |
User_7 |                       Region_9 |                          OTHER |
| +I | 2021-09-22 08:28:18.400 |                         User_6 |
             Page_43 | 2021-09-22 08:28:15.854 |
User_6 |                       Region_5 |                          OTHER |

However, I want to trigger a result whenever a new pageview message arrives
and not wait on the user side.
Do I have any obvious mistake in my code that I cannot get this behavior?
Also is there any code example that I can try where the main stream is
enriched when there is a new event regardless of having any new event in
the dimension side? Flink documentation on temporal join especially for
TableAPI is really thin!

Thanks in advance.

Reply via email to