Hi Reva,

I'm glad to see it can help you.

Quick answers for your questions:
1) Yes, it works. You can deduplicate Task table in the same way using
ROW_NUMBER().
2) Yes. It is a stream-stream join which will be triggered for new messages
from both sides.

Best,
Jark


On Sat, 28 Dec 2019 at 01:02, Eva Eva <eternalsunshine2...@gmail.com> wrote:

> Thanks everyone for the replies.
>
> @Jark,
>
> This is helpful, my code is currently in 1.8 version and I'll upgrade the
> code to 1.9 and give it a try.
>
> Couple of follow-up questions:
> 1. I need to perform Deduplication on Task table as well. Would above
> query work well on two Deduplicated tables, "LatestUser" and "LatestTask"?
> 2. Would JOIN operation be triggered for all new messages on both "User"
> and "Task" table? Same question in different words, will JOIN operation be
> triggered for all upsert messages on "LatestUser" and "LatestTask"
>
> Thanks,
> Reva
>
>
> On Thu, Dec 19, 2019 at 9:50 PM Jark Wu <imj...@gmail.com> wrote:
>
>> Hi Eva,
>>
>> If I understand correctly,
>> 1) the user stream is a changelog stream which every record is a upsert
>> with a primary key, and you only want to join the latest one
>> 2) if the user record is updated, you want to re-trigger the join
>> (retract&update previous joined result)
>>
>> If this is your requirement, fortunately, this use case can be solved in
>> Flink SQL v1.9 with *blink planner*.
>> First, you can use Deduplicate[1] to convert the append stream to an
>> updating stream which keeps the last row.
>> And then, join Task stream with the deduplicated view. Below is the
>> example:
>>
>>
>> Register the following query as "LatestUser" view:
>>
>> SELECT *
>> FROM (
>>   SELECT *, ROW_NUMBER() OVER (PARTITION BY UserID ORDER BY PROCTIME()
>> DESC) AS rn
>>   FROM User
>> ) WHERE rn = 1
>>
>> Join on the "LatestUser":
>>
>>  SELECT * FROM Task t
>>    LEFT JOIN LatestUser ua ON t.PrimaryAssignee = ua.UserID
>>    LEFT JOIN LatestUser ub ON t.SecondaryAssignee = ub.UserID
>>    LEFT JOIN LatestUser uc ON t.Manager = uc.UserID
>>
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#deduplication
>>
>>
>> On Fri, 20 Dec 2019 at 09:53, Kurt Young <ykt...@gmail.com> wrote:
>>
>>> Hi Eva,
>>>
>>> Correct me If i'm wrong. You have an unbounded Task stream and you
>>> want to enrich the User info to the task event. Meanwhile, the User table
>>> is also changing by the time, so you basically want that when task event
>>> comes, join the latest data of User table and emit the results. Even if
>>> the
>>> User table changes again, you don't want to re-trigger the join
>>> operation
>>> which happened before and already emitted, right?
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, Dec 20, 2019 at 12:33 AM Timo Walther <twal...@apache.org>
>>> wrote:
>>>
>>>> Hi Eva,
>>>>
>>>> I'm not 100% sure if your use case can be solved with SQL. JOIN in SQL
>>>> always joins an incoming record with all previous arrived records.
>>>> Maybe
>>>> Jark in CC has some idea?
>>>>
>>>> It might make sense to use the DataStream API instead with a connect()
>>>> and CoProcessFunction where you can simply put the latest row into
>>>> state
>>>> and perform the joining and emission of a new row when required.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 18.12.19 23:44, Eva Eva wrote:
>>>> > Hi Team,
>>>> >
>>>> > I'm trying Flink for the first time and encountered an issue that I
>>>> > would like to discuss and understand if there is a way to achieve my
>>>> use
>>>> > case with Flink.
>>>> >
>>>> > *Use case:* I need to perform unbounded stream joins on multiple data
>>>> > streams by listening to different Kafka topics. I have a scenario to
>>>> > join a column in a table with multiple columns in another table by
>>>> > avoiding duplicate joins. The main concern is that I'm not able to
>>>> avoid
>>>> > duplicate joins.
>>>> >
>>>> > *Issue: *Given the nature of data, it is possible to have updates
>>>> over
>>>> > time, sent as new messages since Kafka is immutable. For a given key
>>>> I
>>>> > would like to perform join only on the latest message, whereas
>>>> currently
>>>> > Flink performs join against all messages with the key (this is what
>>>> I'm
>>>> > calling as duplicate joins issue).
>>>> > Example: Say I have two Kafka streams "User" and "Task". And I want
>>>> to
>>>> > join "User" with multiple columns in "Task".
>>>> > Join "UserID" in "User" with "PrimaryAssignee", "SecondaryAssignee"
>>>> and
>>>> > "Manager" in "Task".
>>>> >
>>>> > Assuming I created and registered DataStreams.
>>>> > Below is my query:
>>>> >
>>>> >    SELECT * FROM Task t
>>>> >     LEFT JOIN User ua ON t.PrimaryAssignee = ua.UserID
>>>> >     LEFT JOIN User ub ON t.SecondaryAssignee = ub.UserID
>>>> >     LEFT JOIN User uc ON t.Manager = uc.UserID
>>>> >
>>>> > Say I have 5 different messages in Kafka with UserID=1000, I don't
>>>> want
>>>> > to perform 5 joins instead I want to perform join with the only
>>>> latest
>>>> > message with UserID=1000. Is there any way to achieve this without
>>>> using
>>>> > Temporal Table Functions?
>>>> >
>>>> > *I cannot use Temporal Table Functions because of below reasons:*
>>>> > 1. I need to trigger JOIN operation for every new message in Kafka.
>>>> > Whereas new messages in Temporal Table don't trigger JOIN operation.
>>>> > 2. I need to perform LEFT OUTER JOINS, whereas Temporal Table can
>>>> only
>>>> > be used for INNER JOINS
>>>> > 3. From what I understand, JOIN in Temporal Table can only be
>>>> performed
>>>> > using Primary key, so I won't be able to Join more than one key.
>>>> >
>>>> >
>>>> > Could someone please help me with this? Please let me know if any of
>>>> the
>>>> > information is not clear or need more details.
>>>> >
>>>> >   If this is not the correct email id, could you please point me to
>>>> the
>>>> > correct one.
>>>> >
>>>> >
>>>> > Thanks in advance!
>>>>
>>>>

Reply via email to