Hi RKandoji,

FYI: Blink-planner subplan reusing: [1] 1.9 available.

       Join                      Join
     /      \                  /      \
 Filter1  Filter2          Filter1  Filter2
    |        |        =>       \     /
 Project1 Project2            Project1
    |        |                   |
  Scan1    Scan2               Scan1


[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala

Best,
Jingsong Lee

On Mon, Dec 30, 2019 at 12:28 PM Terry Wang <zjuwa...@gmail.com> wrote:

> Hi RKandoji~
>
> Could you provide more info about your poc environment?
> Stream or batch? Flink planner or blink planner?
> AFAIK, blink planner has done some optimization to deal such duplicate
> task for one same query. You can have a try with blink planner :
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>
> Best,
> Terry Wang
>
>
>
> 2019年12月30日 03:07,RKandoji <rkand...@gmail.com> 写道:
>
> Hi Team,
>
> I'm doing a POC with flink to understand if it's a good fit for my use
> case.
>
> As part of the process, I need to filter duplicate items and created below
> query to get only the latest records based on timestamp. For instance, I
> have "Users" table which may contain multiple messages for the same
> "userId". So I wrote below query to get only the latest message for a given
> "userId"
>
> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
> userId)");
>
> The above query works as expected and contains only the latest users based
> on timestamp.
>
> The issue is when I use "uniqueUsers" table multiple times in a JOIN
> operation, I see multiple tasks in the flink dashboard for the same query
> that is creating "uniqueUsers" table. It is simply creating as many tasks
> as many times I'm using the table.
>
> Below is the JOIN query.
> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>                                        LEFT JOIN uniqueUsersTbl aa ON
> c.userId = aa.userId
>                                        LEFT JOIN uniqueUsersTbl ab
> ON c.ownerId = ab.userId
>                                        LEFT JOIN uniqueUsersTbl ac ON
> c.sellerId = ac.userId
>                                        LEFT JOIN uniqueUsersTbl ad
> ON c.buyerId = ad.userId");
>
> Could someone please help me understand how I can avoid these duplicate
> tasks?
>
>
> Thanks,
> R Kandoji
>
>
>

-- 
Best, Jingsong Lee

Reply via email to