Hi Team,

I'm doing a POC with flink to understand if it's a good fit for my use
case.

As part of the process, I need to filter duplicate items and created below
query to get only the latest records based on timestamp. For instance, I
have "Users" table which may contain multiple messages for the same
"userId". So I wrote below query to get only the latest message for a given
"userId"

Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
userId)");

The above query works as expected and contains only the latest users based
on timestamp.

The issue is when I use "uniqueUsers" table multiple times in a JOIN
operation, I see multiple tasks in the flink dashboard for the same query
that is creating "uniqueUsers" table. It is simply creating as many tasks
as many times I'm using the table.

Below is the JOIN query.
tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
                                       LEFT JOIN uniqueUsersTbl aa ON
c.userId = aa.userId
                                       LEFT JOIN uniqueUsersTbl ab
ON c.ownerId = ab.userId
                                       LEFT JOIN uniqueUsersTbl ac ON
c.sellerId = ac.userId
                                       LEFT JOIN uniqueUsersTbl ad
ON c.buyerId = ad.userId");

Could someone please help me understand how I can avoid these duplicate
tasks?


Thanks,
R Kandoji

Reply via email to