Hi RKandoji,

In theory, you don't need to do something.
First, the optimizer will optimize by doing duplicate nodes.
Second, after SQL optimization, if the optimized plan still has duplicate
nodes, the planner will automatically reuse them.
There are config options to control whether we should reuse plan, their
default value is true. So you don't need modify them.
- table.optimizer.reuse-sub-plan-enabled
- table.optimizer.reuse-source-enabled

Best,
Jingsong Lee

On Tue, Dec 31, 2019 at 6:29 AM RKandoji <rkand...@gmail.com> wrote:

> Thanks Terry and Jingsong,
>
> Currently I'm on 1.8 version using Flink planner for stream proessing,
> I'll switch to 1.9 version to try out blink planner.
>
> Could you please point me to any examples (Java preferred) using
> SubplanReuser?
>
> Thanks,
> RK
>
> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
>> Hi RKandoji,
>>
>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>
>>        Join                      Join
>>      /      \                  /      \
>>  Filter1  Filter2          Filter1  Filter2
>>     |        |        =>       \     /
>>  Project1 Project2            Project1
>>     |        |                   |
>>   Scan1    Scan2               Scan1
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>
>> Best,
>> Jingsong Lee
>>
>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang <zjuwa...@gmail.com> wrote:
>>
>>> Hi RKandoji~
>>>
>>> Could you provide more info about your poc environment?
>>> Stream or batch? Flink planner or blink planner?
>>> AFAIK, blink planner has done some optimization to deal such duplicate
>>> task for one same query. You can have a try with blink planner :
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>>
>>> Best,
>>> Terry Wang
>>>
>>>
>>>
>>> 2019年12月30日 03:07,RKandoji <rkand...@gmail.com> 写道:
>>>
>>> Hi Team,
>>>
>>> I'm doing a POC with flink to understand if it's a good fit for my use
>>> case.
>>>
>>> As part of the process, I need to filter duplicate items and created
>>> below query to get only the latest records based on timestamp. For
>>> instance, I have "Users" table which may contain multiple messages for the
>>> same "userId". So I wrote below query to get only the latest message for a
>>> given "userId"
>>>
>>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
>>> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
>>> userId)");
>>>
>>> The above query works as expected and contains only the latest users
>>> based on timestamp.
>>>
>>> The issue is when I use "uniqueUsers" table multiple times in a JOIN
>>> operation, I see multiple tasks in the flink dashboard for the same query
>>> that is creating "uniqueUsers" table. It is simply creating as many tasks
>>> as many times I'm using the table.
>>>
>>> Below is the JOIN query.
>>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
>>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>>>                                        LEFT JOIN uniqueUsersTbl aa ON
>>> c.userId = aa.userId
>>>                                        LEFT JOIN uniqueUsersTbl ab
>>> ON c.ownerId = ab.userId
>>>                                        LEFT JOIN uniqueUsersTbl ac ON
>>> c.sellerId = ac.userId
>>>                                        LEFT JOIN uniqueUsersTbl ad
>>> ON c.buyerId = ad.userId");
>>>
>>> Could someone please help me understand how I can avoid these duplicate
>>> tasks?
>>>
>>>
>>> Thanks,
>>> R Kandoji
>>>
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee

Reply via email to