Blink planner was introduced in 1.9. We recommend use blink planner after
1.9.
After some bug fix, I think the latest version of 1.9 is OK. The production
environment has also been set up in some places.

Best,
Jingsong Lee

On Wed, Jan 1, 2020 at 3:24 AM RKandoji <rkand...@gmail.com> wrote:

> Thanks Jingsong and Kurt for more details.
>
> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
> version 1.9. Hopefully deduplication is done by only one task and reused
> everywhere else.
>
> One more follow-up question, I see "For production use cases, we
> recommend the old planner that was present before Flink 1.9 for now." warning
> here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
> This is actually the reason why started with version 1.8, could you please
> let me know your opinion about this? and do you think there is any
> production code running on version 1.9
>
> Thanks,
> Reva
>
>
>
>
> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young <ykt...@gmail.com> wrote:
>
>> BTW, you could also have a more efficient version of deduplicating
>> user table by using the topn feature [1].
>>
>> Best,
>> Kurt
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>
>>
>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>>
>>> Hi RKandoji,
>>>
>>> In theory, you don't need to do something.
>>> First, the optimizer will optimize by doing duplicate nodes.
>>> Second, after SQL optimization, if the optimized plan still has
>>> duplicate nodes, the planner will automatically reuse them.
>>> There are config options to control whether we should reuse plan, their
>>> default value is true. So you don't need modify them.
>>> - table.optimizer.reuse-sub-plan-enabled
>>> - table.optimizer.reuse-source-enabled
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji <rkand...@gmail.com> wrote:
>>>
>>>> Thanks Terry and Jingsong,
>>>>
>>>> Currently I'm on 1.8 version using Flink planner for stream proessing,
>>>> I'll switch to 1.9 version to try out blink planner.
>>>>
>>>> Could you please point me to any examples (Java preferred) using
>>>> SubplanReuser?
>>>>
>>>> Thanks,
>>>> RK
>>>>
>>>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li <jingsongl...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi RKandoji,
>>>>>
>>>>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>>>>
>>>>>        Join                      Join
>>>>>      /      \                  /      \
>>>>>  Filter1  Filter2          Filter1  Filter2
>>>>>     |        |        =>       \     /
>>>>>  Project1 Project2            Project1
>>>>>     |        |                   |
>>>>>   Scan1    Scan2               Scan1
>>>>>
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>>>>
>>>>> Best,
>>>>> Jingsong Lee
>>>>>
>>>>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang <zjuwa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi RKandoji~
>>>>>>
>>>>>> Could you provide more info about your poc environment?
>>>>>> Stream or batch? Flink planner or blink planner?
>>>>>> AFAIK, blink planner has done some optimization to deal such
>>>>>> duplicate task for one same query. You can have a try with blink planner 
>>>>>> :
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>>>>>
>>>>>> Best,
>>>>>> Terry Wang
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2019年12月30日 03:07,RKandoji <rkand...@gmail.com> 写道:
>>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> I'm doing a POC with flink to understand if it's a good fit for my
>>>>>> use case.
>>>>>>
>>>>>> As part of the process, I need to filter duplicate items and created
>>>>>> below query to get only the latest records based on timestamp. For
>>>>>> instance, I have "Users" table which may contain multiple messages for 
>>>>>> the
>>>>>> same "userId". So I wrote below query to get only the latest message for 
>>>>>> a
>>>>>> given "userId"
>>>>>>
>>>>>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
>>>>>> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
>>>>>> userId)");
>>>>>>
>>>>>> The above query works as expected and contains only the latest users
>>>>>> based on timestamp.
>>>>>>
>>>>>> The issue is when I use "uniqueUsers" table multiple times in a JOIN
>>>>>> operation, I see multiple tasks in the flink dashboard for the same query
>>>>>> that is creating "uniqueUsers" table. It is simply creating as many tasks
>>>>>> as many times I'm using the table.
>>>>>>
>>>>>> Below is the JOIN query.
>>>>>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
>>>>>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>>>>>>                                        LEFT JOIN uniqueUsersTbl aa ON
>>>>>> c.userId = aa.userId
>>>>>>                                        LEFT JOIN uniqueUsersTbl ab
>>>>>> ON c.ownerId = ab.userId
>>>>>>                                        LEFT JOIN uniqueUsersTbl ac
>>>>>> ON c.sellerId = ac.userId
>>>>>>                                        LEFT JOIN uniqueUsersTbl ad
>>>>>> ON c.buyerId = ad.userId");
>>>>>>
>>>>>> Could someone please help me understand how I can avoid these
>>>>>> duplicate tasks?
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> R Kandoji
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>

-- 
Best, Jingsong Lee

Reply via email to