Hi RKandoji, FYI: Blink-planner subplan reusing: [1] 1.9 available.
Join Join / \ / \ Filter1 Filter2 Filter1 Filter2 | | => \ / Project1 Project2 Project1 | | | Scan1 Scan2 Scan1 [1] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala Best, Jingsong Lee On Mon, Dec 30, 2019 at 12:28 PM Terry Wang <zjuwa...@gmail.com> wrote: > Hi RKandoji~ > > Could you provide more info about your poc environment? > Stream or batch? Flink planner or blink planner? > AFAIK, blink planner has done some optimization to deal such duplicate > task for one same query. You can have a try with blink planner : > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment > > Best, > Terry Wang > > > > 2019年12月30日 03:07,RKandoji <rkand...@gmail.com> 写道: > > Hi Team, > > I'm doing a POC with flink to understand if it's a good fit for my use > case. > > As part of the process, I need to filter duplicate items and created below > query to get only the latest records based on timestamp. For instance, I > have "Users" table which may contain multiple messages for the same > "userId". So I wrote below query to get only the latest message for a given > "userId" > > Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId, > userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY > userId)"); > > The above query works as expected and contains only the latest users based > on timestamp. > > The issue is when I use "uniqueUsers" table multiple times in a JOIN > operation, I see multiple tasks in the flink dashboard for the same query > that is creating "uniqueUsers" table. It is simply creating as many tasks > as many times I'm using the table. > > Below is the JOIN query. > tEnv.registerTable("uniqueUsersTbl", uniqueUsers); > Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c > LEFT JOIN uniqueUsersTbl aa ON > c.userId = aa.userId > LEFT JOIN uniqueUsersTbl ab > ON c.ownerId = ab.userId > LEFT JOIN uniqueUsersTbl ac ON > c.sellerId = ac.userId > LEFT JOIN uniqueUsersTbl ad > ON c.buyerId = ad.userId"); > > Could someone please help me understand how I can avoid these duplicate > tasks? > > > Thanks, > R Kandoji > > > -- Best, Jingsong Lee