[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16742729#comment-16742729
 ] 

John Roesler commented on KAFKA-7497:
-------------------------------------

Note, the use-case reference above is now at: 
[https://github.com/confluentinc/demo-scene/blob/master/ksql-atm-fraud-detection/ksql-atm-fraud-detection-README.adoc]

 

I _think_ I see the rationale of this ask:

I guess the difference between a windowed aggregation and a self-join is that 
the windowed aggregation would require you to save all the occurrences of the 
key in the window and do your computation (which is a pairwise computation) 
over the collection, whereas the self-join naturally gives you a stream of all 
the pairwise matches when the same key re-occurs within the window in the 
stream.

IIUC, you can do the same computation either way, but it's more naturally 
expressed as a pairwise comparison, so the self-join is more ergonomic? 
(Although, if there are two fraudulent transactions on the same account, they 
would show up in the existing program as two independent potential frauds, 
whereas the aggregation method gives you the opportunity to generate just one 
fraud report with two occurrences)

 

On the other hand, I'm struggling to see the semantics of this feature clearly.

As I understand it, the semantics of a stream-stream join in general is that 
you have a stream of unique events U=<u1, u2, u3> and another stream of unique 
events V=<v1, v2, v3>, and you effectively want to "zip" them to produce 
J=<(u1,v1), (u2,v2), (u3,v3)>. Note that the indices are the identifying key, 
and they are unique. The standard stream processing complications apply, one 
side of a pair may be arbitrarily delayed or disordered, which leads to the 
need for memory on one or both sides of the join.

The use case described in the document linked above seems different; the 
identifying information isn't unique, in that the same account id appears on an 
arbitrary number of events in the stream, but the account id is what we use as 
the joining key. It sounds similar to the "streaming similarity self-join" 
problem, which I found researching this issue: 
[http://www.vldb.org/pvldb/vol9/p792-defranciscimorales.pdf] . Under that 
definition, the "join" is actually just a cartesian product of every record in 
both streams, and for each pair, you compute a similarity, keeping (or 
discarding) only the pairs that have an above-threshold similarity score. There 
are a number of relaxations/optimizations required to do this efficiently, 
which in our case means limiting the product to only those records that already 
share the same account id, and of course limiting the join temporally.

 

I suppose my question, after looking into this is (despite the shared name and 
the fact that the existing implementation happens to suit both), are these two 
operations really the same concept? If we say "yes", for example, then we may 
prohibit future optimizations. For example, in the former stream-stream join, 
you know that the events' keys are unique, so once you produce a pair, you can 
immediately forget both of the input events for it. But for the 
similarity-join, you have to remember the input events until the pre-defined 
join window closes.

 

> Kafka Streams should support self-join on streams
> -------------------------------------------------
>
>                 Key: KAFKA-7497
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7497
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Robin Moffatt
>            Priority: Major
>              Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to