Hi Eva, If I understand correctly, 1) the user stream is a changelog stream which every record is a upsert with a primary key, and you only want to join the latest one 2) if the user record is updated, you want to re-trigger the join (retract&update previous joined result)
If this is your requirement, fortunately, this use case can be solved in Flink SQL v1.9 with *blink planner*. First, you can use Deduplicate[1] to convert the append stream to an updating stream which keeps the last row. And then, join Task stream with the deduplicated view. Below is the example: Register the following query as "LatestUser" view: SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY UserID ORDER BY PROCTIME() DESC) AS rn FROM User ) WHERE rn = 1 Join on the "LatestUser": SELECT * FROM Task t LEFT JOIN LatestUser ua ON t.PrimaryAssignee = ua.UserID LEFT JOIN LatestUser ub ON t.SecondaryAssignee = ub.UserID LEFT JOIN LatestUser uc ON t.Manager = uc.UserID Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#deduplication On Fri, 20 Dec 2019 at 09:53, Kurt Young <ykt...@gmail.com> wrote: > Hi Eva, > > Correct me If i'm wrong. You have an unbounded Task stream and you > want to enrich the User info to the task event. Meanwhile, the User table > is also changing by the time, so you basically want that when task event > comes, join the latest data of User table and emit the results. Even if > the > User table changes again, you don't want to re-trigger the join operation > which happened before and already emitted, right? > > Best, > Kurt > > > On Fri, Dec 20, 2019 at 12:33 AM Timo Walther <twal...@apache.org> wrote: > >> Hi Eva, >> >> I'm not 100% sure if your use case can be solved with SQL. JOIN in SQL >> always joins an incoming record with all previous arrived records. Maybe >> Jark in CC has some idea? >> >> It might make sense to use the DataStream API instead with a connect() >> and CoProcessFunction where you can simply put the latest row into state >> and perform the joining and emission of a new row when required. >> >> Regards, >> Timo >> >> >> On 18.12.19 23:44, Eva Eva wrote: >> > Hi Team, >> > >> > I'm trying Flink for the first time and encountered an issue that I >> > would like to discuss and understand if there is a way to achieve my >> use >> > case with Flink. >> > >> > *Use case:* I need to perform unbounded stream joins on multiple data >> > streams by listening to different Kafka topics. I have a scenario to >> > join a column in a table with multiple columns in another table by >> > avoiding duplicate joins. The main concern is that I'm not able to >> avoid >> > duplicate joins. >> > >> > *Issue: *Given the nature of data, it is possible to have updates over >> > time, sent as new messages since Kafka is immutable. For a given key I >> > would like to perform join only on the latest message, whereas >> currently >> > Flink performs join against all messages with the key (this is what I'm >> > calling as duplicate joins issue). >> > Example: Say I have two Kafka streams "User" and "Task". And I want to >> > join "User" with multiple columns in "Task". >> > Join "UserID" in "User" with "PrimaryAssignee", "SecondaryAssignee" and >> > "Manager" in "Task". >> > >> > Assuming I created and registered DataStreams. >> > Below is my query: >> > >> > SELECT * FROM Task t >> > LEFT JOIN User ua ON t.PrimaryAssignee = ua.UserID >> > LEFT JOIN User ub ON t.SecondaryAssignee = ub.UserID >> > LEFT JOIN User uc ON t.Manager = uc.UserID >> > >> > Say I have 5 different messages in Kafka with UserID=1000, I don't want >> > to perform 5 joins instead I want to perform join with the only latest >> > message with UserID=1000. Is there any way to achieve this without >> using >> > Temporal Table Functions? >> > >> > *I cannot use Temporal Table Functions because of below reasons:* >> > 1. I need to trigger JOIN operation for every new message in Kafka. >> > Whereas new messages in Temporal Table don't trigger JOIN operation. >> > 2. I need to perform LEFT OUTER JOINS, whereas Temporal Table can only >> > be used for INNER JOINS >> > 3. From what I understand, JOIN in Temporal Table can only be performed >> > using Primary key, so I won't be able to Join more than one key. >> > >> > >> > Could someone please help me with this? Please let me know if any of >> the >> > information is not clear or need more details. >> > >> > If this is not the correct email id, could you please point me to the >> > correct one. >> > >> > >> > Thanks in advance! >> >>