Hello Eno,

I have to think through this approach. I could split the messages using the 
source attribute. However one challenge is the fact that I would need to do 
many joins. The example I gave is simplified. The real problem has about 10 
sources of data. And there are various possible matches. A with B, B with C, A 
with D, C with E and so on. Furthermore there might be several match-keys in 
order to match A with B.

So based on this additional information I am wondering 

a. whether a "multi level join" is feasible in order to get the neighbors of 
the neighbors (correlate A with E using C)

b. how to cope with the fact that there are multiple possible match-keys in 
order to link two sources.

I am not sure whether I am in the right mind set and thinking in a "streaming 
way". The algorithm that is in my mind is based on a graph representation of 
the problem. Each message is a node. Each match-key is a node. Connect the 
messages with the match-keys using edges. Now the message nodes are connected 
through the match-key nodes. Each entity is defined by the graph that connects 
all messages that are linked together. 

Kind regards,

Wladislaw

> 
>     Eno Thereska <eno.there...@gmail.com> hat am 12. Juli 2017 um 00:23 
> geschrieben:
> 
>     Hi Wladislaw,
> 
>     Would splitting the one topic into multiple topics be acceptable at all? 
> E.g., you could use the "branch" function in the DSL to split the messages 
> and send to different topics. Then, once you have multiple topics you can do 
> the joins etc.
> 
>     Thoughts?
> 
>     Thanks
>     Eno
> 
>         > > 
> >         On 11 Jul 2017, at 05:02, Wladislaw Mitzel <mit...@tawadi.de> wrote:
> > 
> >         Hi all. How would one approach the following scenario with Kafka 
> > streams?
> > 
> >         There is one input topic. It has data from different sources in a 
> > normalized format. There is a need to join records that come from different 
> > sources but are linked to the same entity (record linkage). There is a 
> > deterministic rule-set to calculate (composed) "match-keys" for every 
> > incoming record that allow the correlation of records that are linked to 
> > the same entity.
> > 
> >         Example: There are events A (userid,first name,last name,....), 
> > B(username, location,.....) and C(location, weatcher-data,....). There is a 
> > set of rules in order to correlate A with B (A.firstName+A.lastName = 
> > B.username) and B with C (B.location = C.location). At the end, we want to 
> > get the whole graph of correlated records.
> > 
> >         Constraints: The latency of the records linkage should be as low as 
> > possible. The state stores should contain the messages of the last 180 days 
> > for linkage. (We are talking about tens to hundreds of GB of data)
> > 
> >         I already implemented a solution with spark + an external database. 
> > I calculate the match-keys and then store mappings for event-id => 
> > list-of-match-keys, match-key => list-of-event-ids and event-id => 
> > event-payload in the database. By querying the database one can get a graph 
> > of "event -> match-keys -> more events" and so on. I do the querying in a 
> > loop until there are no new events added. As a last step, I read the 
> > payloads using the accumulated event-ids. However, this solution has a high 
> > latency because of the external database calls. That’s why the idea of 
> > having KTables as local state stores sounds so interesting to me.
> > 
> >         Now with Kafka streams I would like to use the embedded state with 
> > KTables but I find it quite hard to come up with a solution. I think what I 
> > want to do is a self-join on the incoming topic which is not yet supported 
> > by the DSL. I thought of using the Processor API implementing a very 
> > similar solution to the one I described with spark: using several state 
> > stores for the mapping of event => match-keys, match-key => events. Beside 
> > the fact that I don't know how to address the partitioning (or whether I 
> > need a global store) I am not sure whether this is the way one would go 
> > with Kafka streams.
> > 
> >         Another solution I could think of is a loop in the topology so that 
> > an event would flow several times through the loop (which again has KTables 
> > for the mapping of event-id and match-key) until there are no new matches. 
> > Are loops possible at all and if so, is it a good approach or should one 
> > avoid loops? At the end of the record linkage process I’d like to have 
> > *one* message that contains the payloads of all correlated events and is 
> > then processed by the downstream processors. However I can only think of 
> > solutions where I need to do a flatMap() (do a join for every match-key) so 
> > that there is more than one message.
> > 
> >         Do you have any feedback or suggestions? Any examples that could 
> > help?
> > 
> >         Kind regards,
> > 
> >         Wladislaw
> > 
> >     > 

Reply via email to