Hi Reva, I'm glad to see it can help you.
Quick answers for your questions: 1) Yes, it works. You can deduplicate Task table in the same way using ROW_NUMBER(). 2) Yes. It is a stream-stream join which will be triggered for new messages from both sides. Best, Jark On Sat, 28 Dec 2019 at 01:02, Eva Eva <eternalsunshine2...@gmail.com> wrote: > Thanks everyone for the replies. > > @Jark, > > This is helpful, my code is currently in 1.8 version and I'll upgrade the > code to 1.9 and give it a try. > > Couple of follow-up questions: > 1. I need to perform Deduplication on Task table as well. Would above > query work well on two Deduplicated tables, "LatestUser" and "LatestTask"? > 2. Would JOIN operation be triggered for all new messages on both "User" > and "Task" table? Same question in different words, will JOIN operation be > triggered for all upsert messages on "LatestUser" and "LatestTask" > > Thanks, > Reva > > > On Thu, Dec 19, 2019 at 9:50 PM Jark Wu <imj...@gmail.com> wrote: > >> Hi Eva, >> >> If I understand correctly, >> 1) the user stream is a changelog stream which every record is a upsert >> with a primary key, and you only want to join the latest one >> 2) if the user record is updated, you want to re-trigger the join >> (retract&update previous joined result) >> >> If this is your requirement, fortunately, this use case can be solved in >> Flink SQL v1.9 with *blink planner*. >> First, you can use Deduplicate[1] to convert the append stream to an >> updating stream which keeps the last row. >> And then, join Task stream with the deduplicated view. Below is the >> example: >> >> >> Register the following query as "LatestUser" view: >> >> SELECT * >> FROM ( >> SELECT *, ROW_NUMBER() OVER (PARTITION BY UserID ORDER BY PROCTIME() >> DESC) AS rn >> FROM User >> ) WHERE rn = 1 >> >> Join on the "LatestUser": >> >> SELECT * FROM Task t >> LEFT JOIN LatestUser ua ON t.PrimaryAssignee = ua.UserID >> LEFT JOIN LatestUser ub ON t.SecondaryAssignee = ub.UserID >> LEFT JOIN LatestUser uc ON t.Manager = uc.UserID >> >> >> Best, >> Jark >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#deduplication >> >> >> On Fri, 20 Dec 2019 at 09:53, Kurt Young <ykt...@gmail.com> wrote: >> >>> Hi Eva, >>> >>> Correct me If i'm wrong. You have an unbounded Task stream and you >>> want to enrich the User info to the task event. Meanwhile, the User table >>> is also changing by the time, so you basically want that when task event >>> comes, join the latest data of User table and emit the results. Even if >>> the >>> User table changes again, you don't want to re-trigger the join >>> operation >>> which happened before and already emitted, right? >>> >>> Best, >>> Kurt >>> >>> >>> On Fri, Dec 20, 2019 at 12:33 AM Timo Walther <twal...@apache.org> >>> wrote: >>> >>>> Hi Eva, >>>> >>>> I'm not 100% sure if your use case can be solved with SQL. JOIN in SQL >>>> always joins an incoming record with all previous arrived records. >>>> Maybe >>>> Jark in CC has some idea? >>>> >>>> It might make sense to use the DataStream API instead with a connect() >>>> and CoProcessFunction where you can simply put the latest row into >>>> state >>>> and perform the joining and emission of a new row when required. >>>> >>>> Regards, >>>> Timo >>>> >>>> >>>> On 18.12.19 23:44, Eva Eva wrote: >>>> > Hi Team, >>>> > >>>> > I'm trying Flink for the first time and encountered an issue that I >>>> > would like to discuss and understand if there is a way to achieve my >>>> use >>>> > case with Flink. >>>> > >>>> > *Use case:* I need to perform unbounded stream joins on multiple data >>>> > streams by listening to different Kafka topics. I have a scenario to >>>> > join a column in a table with multiple columns in another table by >>>> > avoiding duplicate joins. The main concern is that I'm not able to >>>> avoid >>>> > duplicate joins. >>>> > >>>> > *Issue: *Given the nature of data, it is possible to have updates >>>> over >>>> > time, sent as new messages since Kafka is immutable. For a given key >>>> I >>>> > would like to perform join only on the latest message, whereas >>>> currently >>>> > Flink performs join against all messages with the key (this is what >>>> I'm >>>> > calling as duplicate joins issue). >>>> > Example: Say I have two Kafka streams "User" and "Task". And I want >>>> to >>>> > join "User" with multiple columns in "Task". >>>> > Join "UserID" in "User" with "PrimaryAssignee", "SecondaryAssignee" >>>> and >>>> > "Manager" in "Task". >>>> > >>>> > Assuming I created and registered DataStreams. >>>> > Below is my query: >>>> > >>>> > SELECT * FROM Task t >>>> > LEFT JOIN User ua ON t.PrimaryAssignee = ua.UserID >>>> > LEFT JOIN User ub ON t.SecondaryAssignee = ub.UserID >>>> > LEFT JOIN User uc ON t.Manager = uc.UserID >>>> > >>>> > Say I have 5 different messages in Kafka with UserID=1000, I don't >>>> want >>>> > to perform 5 joins instead I want to perform join with the only >>>> latest >>>> > message with UserID=1000. Is there any way to achieve this without >>>> using >>>> > Temporal Table Functions? >>>> > >>>> > *I cannot use Temporal Table Functions because of below reasons:* >>>> > 1. I need to trigger JOIN operation for every new message in Kafka. >>>> > Whereas new messages in Temporal Table don't trigger JOIN operation. >>>> > 2. I need to perform LEFT OUTER JOINS, whereas Temporal Table can >>>> only >>>> > be used for INNER JOINS >>>> > 3. From what I understand, JOIN in Temporal Table can only be >>>> performed >>>> > using Primary key, so I won't be able to Join more than one key. >>>> > >>>> > >>>> > Could someone please help me with this? Please let me know if any of >>>> the >>>> > information is not clear or need more details. >>>> > >>>> > If this is not the correct email id, could you please point me to >>>> the >>>> > correct one. >>>> > >>>> > >>>> > Thanks in advance! >>>> >>>>