Hi Eva,

If I understand correctly,
1) the user stream is a changelog stream which every record is a upsert
with a primary key, and you only want to join the latest one
2) if the user record is updated, you want to re-trigger the join
(retract&update previous joined result)

If this is your requirement, fortunately, this use case can be solved in
Flink SQL v1.9 with *blink planner*.
First, you can use Deduplicate[1] to convert the append stream to an
updating stream which keeps the last row.
And then, join Task stream with the deduplicated view. Below is the example:


Register the following query as "LatestUser" view:

SELECT *
FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY UserID ORDER BY PROCTIME()
DESC) AS rn
  FROM User
) WHERE rn = 1

Join on the "LatestUser":

 SELECT * FROM Task t
   LEFT JOIN LatestUser ua ON t.PrimaryAssignee = ua.UserID
   LEFT JOIN LatestUser ub ON t.SecondaryAssignee = ub.UserID
   LEFT JOIN LatestUser uc ON t.Manager = uc.UserID


Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#deduplication


On Fri, 20 Dec 2019 at 09:53, Kurt Young <ykt...@gmail.com> wrote:

> Hi Eva,
>
> Correct me If i'm wrong. You have an unbounded Task stream and you
> want to enrich the User info to the task event. Meanwhile, the User table
> is also changing by the time, so you basically want that when task event
> comes, join the latest data of User table and emit the results. Even if
> the
> User table changes again, you don't want to re-trigger the join operation
> which happened before and already emitted, right?
>
> Best,
> Kurt
>
>
> On Fri, Dec 20, 2019 at 12:33 AM Timo Walther <twal...@apache.org> wrote:
>
>> Hi Eva,
>>
>> I'm not 100% sure if your use case can be solved with SQL. JOIN in SQL
>> always joins an incoming record with all previous arrived records. Maybe
>> Jark in CC has some idea?
>>
>> It might make sense to use the DataStream API instead with a connect()
>> and CoProcessFunction where you can simply put the latest row into state
>> and perform the joining and emission of a new row when required.
>>
>> Regards,
>> Timo
>>
>>
>> On 18.12.19 23:44, Eva Eva wrote:
>> > Hi Team,
>> >
>> > I'm trying Flink for the first time and encountered an issue that I
>> > would like to discuss and understand if there is a way to achieve my
>> use
>> > case with Flink.
>> >
>> > *Use case:* I need to perform unbounded stream joins on multiple data
>> > streams by listening to different Kafka topics. I have a scenario to
>> > join a column in a table with multiple columns in another table by
>> > avoiding duplicate joins. The main concern is that I'm not able to
>> avoid
>> > duplicate joins.
>> >
>> > *Issue: *Given the nature of data, it is possible to have updates over
>> > time, sent as new messages since Kafka is immutable. For a given key I
>> > would like to perform join only on the latest message, whereas
>> currently
>> > Flink performs join against all messages with the key (this is what I'm
>> > calling as duplicate joins issue).
>> > Example: Say I have two Kafka streams "User" and "Task". And I want to
>> > join "User" with multiple columns in "Task".
>> > Join "UserID" in "User" with "PrimaryAssignee", "SecondaryAssignee" and
>> > "Manager" in "Task".
>> >
>> > Assuming I created and registered DataStreams.
>> > Below is my query:
>> >
>> >    SELECT * FROM Task t
>> >     LEFT JOIN User ua ON t.PrimaryAssignee = ua.UserID
>> >     LEFT JOIN User ub ON t.SecondaryAssignee = ub.UserID
>> >     LEFT JOIN User uc ON t.Manager = uc.UserID
>> >
>> > Say I have 5 different messages in Kafka with UserID=1000, I don't want
>> > to perform 5 joins instead I want to perform join with the only latest
>> > message with UserID=1000. Is there any way to achieve this without
>> using
>> > Temporal Table Functions?
>> >
>> > *I cannot use Temporal Table Functions because of below reasons:*
>> > 1. I need to trigger JOIN operation for every new message in Kafka.
>> > Whereas new messages in Temporal Table don't trigger JOIN operation.
>> > 2. I need to perform LEFT OUTER JOINS, whereas Temporal Table can only
>> > be used for INNER JOINS
>> > 3. From what I understand, JOIN in Temporal Table can only be performed
>> > using Primary key, so I won't be able to Join more than one key.
>> >
>> >
>> > Could someone please help me with this? Please let me know if any of
>> the
>> > information is not clear or need more details.
>> >
>> >   If this is not the correct email id, could you please point me to the
>> > correct one.
>> >
>> >
>> > Thanks in advance!
>>
>>

Reply via email to