Thanks everyone for the replies. @Jark,
This is helpful, my code is currently in 1.8 version and I'll upgrade the code to 1.9 and give it a try. Couple of follow-up questions: 1. I need to perform Deduplication on Task table as well. Would above query work well on two Deduplicated tables, "LatestUser" and "LatestTask"? 2. Would JOIN operation be triggered for all new messages on both "User" and "Task" table? Same question in different words, will JOIN operation be triggered for all upsert messages on "LatestUser" and "LatestTask" Thanks, Reva On Thu, Dec 19, 2019 at 9:50 PM Jark Wu <imj...@gmail.com> wrote: > Hi Eva, > > If I understand correctly, > 1) the user stream is a changelog stream which every record is a upsert > with a primary key, and you only want to join the latest one > 2) if the user record is updated, you want to re-trigger the join > (retract&update previous joined result) > > If this is your requirement, fortunately, this use case can be solved in > Flink SQL v1.9 with *blink planner*. > First, you can use Deduplicate[1] to convert the append stream to an > updating stream which keeps the last row. > And then, join Task stream with the deduplicated view. Below is the > example: > > > Register the following query as "LatestUser" view: > > SELECT * > FROM ( > SELECT *, ROW_NUMBER() OVER (PARTITION BY UserID ORDER BY PROCTIME() > DESC) AS rn > FROM User > ) WHERE rn = 1 > > Join on the "LatestUser": > > SELECT * FROM Task t > LEFT JOIN LatestUser ua ON t.PrimaryAssignee = ua.UserID > LEFT JOIN LatestUser ub ON t.SecondaryAssignee = ub.UserID > LEFT JOIN LatestUser uc ON t.Manager = uc.UserID > > > Best, > Jark > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#deduplication > > > On Fri, 20 Dec 2019 at 09:53, Kurt Young <ykt...@gmail.com> wrote: > >> Hi Eva, >> >> Correct me If i'm wrong. You have an unbounded Task stream and you >> want to enrich the User info to the task event. Meanwhile, the User table >> is also changing by the time, so you basically want that when task event >> comes, join the latest data of User table and emit the results. Even if >> the >> User table changes again, you don't want to re-trigger the join operation >> which happened before and already emitted, right? >> >> Best, >> Kurt >> >> >> On Fri, Dec 20, 2019 at 12:33 AM Timo Walther <twal...@apache.org> wrote: >> >>> Hi Eva, >>> >>> I'm not 100% sure if your use case can be solved with SQL. JOIN in SQL >>> always joins an incoming record with all previous arrived records. Maybe >>> Jark in CC has some idea? >>> >>> It might make sense to use the DataStream API instead with a connect() >>> and CoProcessFunction where you can simply put the latest row into state >>> and perform the joining and emission of a new row when required. >>> >>> Regards, >>> Timo >>> >>> >>> On 18.12.19 23:44, Eva Eva wrote: >>> > Hi Team, >>> > >>> > I'm trying Flink for the first time and encountered an issue that I >>> > would like to discuss and understand if there is a way to achieve my >>> use >>> > case with Flink. >>> > >>> > *Use case:* I need to perform unbounded stream joins on multiple data >>> > streams by listening to different Kafka topics. I have a scenario to >>> > join a column in a table with multiple columns in another table by >>> > avoiding duplicate joins. The main concern is that I'm not able to >>> avoid >>> > duplicate joins. >>> > >>> > *Issue: *Given the nature of data, it is possible to have updates over >>> > time, sent as new messages since Kafka is immutable. For a given key I >>> > would like to perform join only on the latest message, whereas >>> currently >>> > Flink performs join against all messages with the key (this is what >>> I'm >>> > calling as duplicate joins issue). >>> > Example: Say I have two Kafka streams "User" and "Task". And I want to >>> > join "User" with multiple columns in "Task". >>> > Join "UserID" in "User" with "PrimaryAssignee", "SecondaryAssignee" >>> and >>> > "Manager" in "Task". >>> > >>> > Assuming I created and registered DataStreams. >>> > Below is my query: >>> > >>> > SELECT * FROM Task t >>> > LEFT JOIN User ua ON t.PrimaryAssignee = ua.UserID >>> > LEFT JOIN User ub ON t.SecondaryAssignee = ub.UserID >>> > LEFT JOIN User uc ON t.Manager = uc.UserID >>> > >>> > Say I have 5 different messages in Kafka with UserID=1000, I don't >>> want >>> > to perform 5 joins instead I want to perform join with the only latest >>> > message with UserID=1000. Is there any way to achieve this without >>> using >>> > Temporal Table Functions? >>> > >>> > *I cannot use Temporal Table Functions because of below reasons:* >>> > 1. I need to trigger JOIN operation for every new message in Kafka. >>> > Whereas new messages in Temporal Table don't trigger JOIN operation. >>> > 2. I need to perform LEFT OUTER JOINS, whereas Temporal Table can only >>> > be used for INNER JOINS >>> > 3. From what I understand, JOIN in Temporal Table can only be >>> performed >>> > using Primary key, so I won't be able to Join more than one key. >>> > >>> > >>> > Could someone please help me with this? Please let me know if any of >>> the >>> > information is not clear or need more details. >>> > >>> > If this is not the correct email id, could you please point me to >>> the >>> > correct one. >>> > >>> > >>> > Thanks in advance! >>> >>>