hi Kurt,

Thanks for the additional info.

RK

On Sun, Jan 5, 2020 at 8:33 PM Kurt Young <ykt...@gmail.com> wrote:

> Another common skew case we've seen is null handling, the value of the
> join key
> is NULL. We will shuffle the NULL value into one task even if the join
> condition
> won't stand by definition.
>
> For DeDuplication, I just want to make sure this behavior meets your
> requirement.
> Because for some other usages, users might be only interested with the
> earliest
> records because the updating for the same key is purely redundant, like
> caused by
> upstream failure and process the same data again. In that case, each key
> will only have
> at most one record and you won't face any join key skewing issue.
>
> Best,
> Kurt
>
>
> On Mon, Jan 6, 2020 at 6:55 AM RKandoji <rkand...@gmail.com> wrote:
>
>> Hi Kurt,
>>
>> I understand what you mean, some userIds may appear more frequently than
>> the others but this distribution doesn't look in proportionate with the
>> data skew. Do you think of any other possible reasons or anything I can try
>> out to investigate this more?
>>
>> For DeDuplication, I query for the latest record. Sorry I didn't follow
>> above sentence, do you mean that for each update to user table the
>> record(s) that were updated will be sent via retract stream.I think that's
>> expected as I need to process latest records, as long as it is sending only
>> the record(s) that's been updated.
>>
>> Thanks,
>> RKandoji
>>
>> On Fri, Jan 3, 2020 at 9:57 PM Kurt Young <ykt...@gmail.com> wrote:
>>
>>> Hi RKandoji,
>>>
>>> It looks like you have a data skew issue with your input data. Some or
>>> maybe only one "userId" appears more frequent than others. For join
>>> operator to work correctly, Flink will apply "shuffle by join key"
>>> before the
>>> operator, so same "userId" will go to the same sub-task to perform join
>>> operation. In this case, I'm afraid there is nothing much you can do for
>>> now.
>>>
>>> BTW, for the DeDuplicate, do you keep the latest record or the earliest?
>>> If
>>> you keep the latest version, Flink will tigger retraction and then send
>>> the latest
>>> record again every time when your user table changes.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Sat, Jan 4, 2020 at 5:09 AM RKandoji <rkand...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Thanks a ton for the help with earlier questions, I updated code to
>>>> version 1.9 and started using Blink Planner (DeDuplication). This is
>>>> working as expected!
>>>>
>>>> I have a new question, but thought of asking in the same email chain as
>>>> this has more context about my use case etc.
>>>>
>>>> Workflow:
>>>> Currently I'm reading from a couple of Kafka topics, DeDuplicating the
>>>> input data, performing JOINs and writing the joined data to another Kafka
>>>> topic.
>>>>
>>>> Issue:
>>>> I set Parallelism to 8 and on analyzing the subtasks found that the
>>>> data is not distributed well among 8 parallel tasks for the last Join
>>>> query. One of a subtask is taking huge load, whereas others taking pretty
>>>> low load.
>>>>
>>>> Tried a couple of things below, but no use. Not sure if they are
>>>> actually related to the problem as I couldn't yet understand what's the
>>>> issue here.
>>>> 1. increasing the number of partitions of output Kafka topic.
>>>> 2. tried adding keys to output so key partitioning happens at Kafka end.
>>>>
>>>> Below is a snapshot for reference:
>>>> [image: image.png]
>>>>
>>>> Below are the config changes I made:
>>>>
>>>> taskmanager.numberOfTaskSlots: 8
>>>> parallelism.default: 8
>>>> jobmanager.heap.size: 5000m
>>>> taskmanager.heap.size: 5000m
>>>> state.backend: rocksdb
>>>> state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
>>>> state.backend.incremental: true
>>>>
>>>> I don't see any errors and job seems to be running smoothly (and
>>>> slowly). I need to make it distribute the load well for faster processing,
>>>> any pointers on what could be wrong and how to fix it would be very 
>>>> helpful.
>>>>
>>>> Thanks,
>>>> RKandoji
>>>>
>>>>
>>>> On Fri, Jan 3, 2020 at 1:06 PM RKandoji <rkand...@gmail.com> wrote:
>>>>
>>>>> Thanks!
>>>>>
>>>>> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li <jingsongl...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yes,
>>>>>>
>>>>>> 1.9.2 or Coming soon 1.10
>>>>>>
>>>>>> Best,
>>>>>> Jingsong Lee
>>>>>>
>>>>>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji <rkand...@gmail.com> wrote:
>>>>>>
>>>>>>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>>>>>>
>>>>>>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li <jingsongl...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Blink planner was introduced in 1.9. We recommend use blink planner
>>>>>>>> after 1.9.
>>>>>>>> After some bug fix, I think the latest version of 1.9 is OK. The
>>>>>>>> production environment has also been set up in some places.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jingsong Lee
>>>>>>>>
>>>>>>>> On Wed, Jan 1, 2020 at 3:24 AM RKandoji <rkand...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks Jingsong and Kurt for more details.
>>>>>>>>>
>>>>>>>>> Yes, I'm planning to try out DeDuplication when I'm done upgrading
>>>>>>>>> to version 1.9. Hopefully deduplication is done by only one task and 
>>>>>>>>> reused
>>>>>>>>> everywhere else.
>>>>>>>>>
>>>>>>>>> One more follow-up question, I see "For production use cases, we
>>>>>>>>> recommend the old planner that was present before Flink 1.9 for now." 
>>>>>>>>> warning
>>>>>>>>> here
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>>>>>>>>> This is actually the reason why started with version 1.8, could
>>>>>>>>> you please let me know your opinion about this? and do you think 
>>>>>>>>> there is
>>>>>>>>> any production code running on version 1.9
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Reva
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young <ykt...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> BTW, you could also have a more efficient version of deduplicating
>>>>>>>>>> user table by using the topn feature [1].
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Kurt
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li <
>>>>>>>>>> jingsongl...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi RKandoji,
>>>>>>>>>>>
>>>>>>>>>>> In theory, you don't need to do something.
>>>>>>>>>>> First, the optimizer will optimize by doing duplicate nodes.
>>>>>>>>>>> Second, after SQL optimization, if the optimized plan still has
>>>>>>>>>>> duplicate nodes, the planner will automatically reuse them.
>>>>>>>>>>> There are config options to control whether we should reuse
>>>>>>>>>>> plan, their default value is true. So you don't need modify them.
>>>>>>>>>>> - table.optimizer.reuse-sub-plan-enabled
>>>>>>>>>>> - table.optimizer.reuse-source-enabled
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jingsong Lee
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji <rkand...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Terry and Jingsong,
>>>>>>>>>>>>
>>>>>>>>>>>> Currently I'm on 1.8 version using Flink planner for stream
>>>>>>>>>>>> proessing, I'll switch to 1.9 version to try out blink planner.
>>>>>>>>>>>>
>>>>>>>>>>>> Could you please point me to any examples (Java preferred)
>>>>>>>>>>>> using SubplanReuser?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> RK
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li <
>>>>>>>>>>>> jingsongl...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi RKandoji,
>>>>>>>>>>>>>
>>>>>>>>>>>>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>>>>>>>>>>>>
>>>>>>>>>>>>>        Join                      Join
>>>>>>>>>>>>>      /      \                  /      \
>>>>>>>>>>>>>  Filter1  Filter2          Filter1  Filter2
>>>>>>>>>>>>>     |        |        =>       \     /
>>>>>>>>>>>>>  Project1 Project2            Project1
>>>>>>>>>>>>>     |        |                   |
>>>>>>>>>>>>>   Scan1    Scan2               Scan1
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jingsong Lee
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang <
>>>>>>>>>>>>> zjuwa...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi RKandoji~
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Could you provide more info about your poc environment?
>>>>>>>>>>>>>> Stream or batch? Flink planner or blink planner?
>>>>>>>>>>>>>> AFAIK, blink planner has done some optimization to deal such
>>>>>>>>>>>>>> duplicate task for one same query. You can have a try with blink 
>>>>>>>>>>>>>> planner :
>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Terry Wang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2019年12月30日 03:07,RKandoji <rkand...@gmail.com> 写道:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm doing a POC with flink to understand if it's a good fit
>>>>>>>>>>>>>> for my use case.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As part of the process, I need to filter duplicate items and
>>>>>>>>>>>>>> created below query to get only the latest records based on 
>>>>>>>>>>>>>> timestamp. For
>>>>>>>>>>>>>> instance, I have "Users" table which may contain multiple 
>>>>>>>>>>>>>> messages for the
>>>>>>>>>>>>>> same "userId". So I wrote below query to get only the latest 
>>>>>>>>>>>>>> message for a
>>>>>>>>>>>>>> given "userId"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE
>>>>>>>>>>>>>> (userId, userTimestamp) IN (SELECT userId, MAX(userTimestamp) 
>>>>>>>>>>>>>> FROM Users
>>>>>>>>>>>>>> GROUP BY userId)");
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The above query works as expected and contains only the
>>>>>>>>>>>>>> latest users based on timestamp.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The issue is when I use "uniqueUsers" table multiple times in
>>>>>>>>>>>>>> a JOIN operation, I see multiple tasks in the flink dashboard 
>>>>>>>>>>>>>> for the same
>>>>>>>>>>>>>> query that is creating "uniqueUsers" table. It is simply 
>>>>>>>>>>>>>> creating as many
>>>>>>>>>>>>>> tasks as many times I'm using the table.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Below is the JOIN query.
>>>>>>>>>>>>>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
>>>>>>>>>>>>>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>>>>>>>>>>>>>>                                        LEFT JOIN
>>>>>>>>>>>>>> uniqueUsersTbl aa ON c.userId = aa.userId
>>>>>>>>>>>>>>                                        LEFT JOIN
>>>>>>>>>>>>>> uniqueUsersTbl ab ON c.ownerId = ab.userId
>>>>>>>>>>>>>>                                        LEFT JOIN
>>>>>>>>>>>>>> uniqueUsersTbl ac ON c.sellerId = ac.userId
>>>>>>>>>>>>>>                                        LEFT JOIN
>>>>>>>>>>>>>> uniqueUsersTbl ad ON c.buyerId = ad.userId");
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Could someone please help me understand how I can avoid these
>>>>>>>>>>>>>> duplicate tasks?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> R Kandoji
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best, Jingsong Lee
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>

Reply via email to