Blink planner was introduced in 1.9. We recommend use blink planner after 1.9. After some bug fix, I think the latest version of 1.9 is OK. The production environment has also been set up in some places.
Best, Jingsong Lee On Wed, Jan 1, 2020 at 3:24 AM RKandoji <rkand...@gmail.com> wrote: > Thanks Jingsong and Kurt for more details. > > Yes, I'm planning to try out DeDuplication when I'm done upgrading to > version 1.9. Hopefully deduplication is done by only one task and reused > everywhere else. > > One more follow-up question, I see "For production use cases, we > recommend the old planner that was present before Flink 1.9 for now." warning > here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/ > This is actually the reason why started with version 1.8, could you please > let me know your opinion about this? and do you think there is any > production code running on version 1.9 > > Thanks, > Reva > > > > > On Mon, Dec 30, 2019 at 9:02 PM Kurt Young <ykt...@gmail.com> wrote: > >> BTW, you could also have a more efficient version of deduplicating >> user table by using the topn feature [1]. >> >> Best, >> Kurt >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n >> >> >> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li <jingsongl...@gmail.com> >> wrote: >> >>> Hi RKandoji, >>> >>> In theory, you don't need to do something. >>> First, the optimizer will optimize by doing duplicate nodes. >>> Second, after SQL optimization, if the optimized plan still has >>> duplicate nodes, the planner will automatically reuse them. >>> There are config options to control whether we should reuse plan, their >>> default value is true. So you don't need modify them. >>> - table.optimizer.reuse-sub-plan-enabled >>> - table.optimizer.reuse-source-enabled >>> >>> Best, >>> Jingsong Lee >>> >>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji <rkand...@gmail.com> wrote: >>> >>>> Thanks Terry and Jingsong, >>>> >>>> Currently I'm on 1.8 version using Flink planner for stream proessing, >>>> I'll switch to 1.9 version to try out blink planner. >>>> >>>> Could you please point me to any examples (Java preferred) using >>>> SubplanReuser? >>>> >>>> Thanks, >>>> RK >>>> >>>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li <jingsongl...@gmail.com> >>>> wrote: >>>> >>>>> Hi RKandoji, >>>>> >>>>> FYI: Blink-planner subplan reusing: [1] 1.9 available. >>>>> >>>>> Join Join >>>>> / \ / \ >>>>> Filter1 Filter2 Filter1 Filter2 >>>>> | | => \ / >>>>> Project1 Project2 Project1 >>>>> | | | >>>>> Scan1 Scan2 Scan1 >>>>> >>>>> >>>>> [1] >>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala >>>>> >>>>> Best, >>>>> Jingsong Lee >>>>> >>>>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang <zjuwa...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi RKandoji~ >>>>>> >>>>>> Could you provide more info about your poc environment? >>>>>> Stream or batch? Flink planner or blink planner? >>>>>> AFAIK, blink planner has done some optimization to deal such >>>>>> duplicate task for one same query. You can have a try with blink planner >>>>>> : >>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment >>>>>> >>>>>> Best, >>>>>> Terry Wang >>>>>> >>>>>> >>>>>> >>>>>> 2019年12月30日 03:07,RKandoji <rkand...@gmail.com> 写道: >>>>>> >>>>>> Hi Team, >>>>>> >>>>>> I'm doing a POC with flink to understand if it's a good fit for my >>>>>> use case. >>>>>> >>>>>> As part of the process, I need to filter duplicate items and created >>>>>> below query to get only the latest records based on timestamp. For >>>>>> instance, I have "Users" table which may contain multiple messages for >>>>>> the >>>>>> same "userId". So I wrote below query to get only the latest message for >>>>>> a >>>>>> given "userId" >>>>>> >>>>>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId, >>>>>> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY >>>>>> userId)"); >>>>>> >>>>>> The above query works as expected and contains only the latest users >>>>>> based on timestamp. >>>>>> >>>>>> The issue is when I use "uniqueUsers" table multiple times in a JOIN >>>>>> operation, I see multiple tasks in the flink dashboard for the same query >>>>>> that is creating "uniqueUsers" table. It is simply creating as many tasks >>>>>> as many times I'm using the table. >>>>>> >>>>>> Below is the JOIN query. >>>>>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers); >>>>>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c >>>>>> LEFT JOIN uniqueUsersTbl aa ON >>>>>> c.userId = aa.userId >>>>>> LEFT JOIN uniqueUsersTbl ab >>>>>> ON c.ownerId = ab.userId >>>>>> LEFT JOIN uniqueUsersTbl ac >>>>>> ON c.sellerId = ac.userId >>>>>> LEFT JOIN uniqueUsersTbl ad >>>>>> ON c.buyerId = ad.userId"); >>>>>> >>>>>> Could someone please help me understand how I can avoid these >>>>>> duplicate tasks? >>>>>> >>>>>> >>>>>> Thanks, >>>>>> R Kandoji >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> >>> >>> -- >>> Best, Jingsong Lee >>> >> -- Best, Jingsong Lee