Hello Eno, I have to think through this approach. I could split the messages using the source attribute. However one challenge is the fact that I would need to do many joins. The example I gave is simplified. The real problem has about 10 sources of data. And there are various possible matches. A with B, B with C, A with D, C with E and so on. Furthermore there might be several match-keys in order to match A with B.
So based on this additional information I am wondering a. whether a "multi level join" is feasible in order to get the neighbors of the neighbors (correlate A with E using C) b. how to cope with the fact that there are multiple possible match-keys in order to link two sources. I am not sure whether I am in the right mind set and thinking in a "streaming way". The algorithm that is in my mind is based on a graph representation of the problem. Each message is a node. Each match-key is a node. Connect the messages with the match-keys using edges. Now the message nodes are connected through the match-key nodes. Each entity is defined by the graph that connects all messages that are linked together. Kind regards, Wladislaw > > Eno Thereska <eno.there...@gmail.com> hat am 12. Juli 2017 um 00:23 > geschrieben: > > Hi Wladislaw, > > Would splitting the one topic into multiple topics be acceptable at all? > E.g., you could use the "branch" function in the DSL to split the messages > and send to different topics. Then, once you have multiple topics you can do > the joins etc. > > Thoughts? > > Thanks > Eno > > > > > > On 11 Jul 2017, at 05:02, Wladislaw Mitzel <mit...@tawadi.de> wrote: > > > > Hi all. How would one approach the following scenario with Kafka > > streams? > > > > There is one input topic. It has data from different sources in a > > normalized format. There is a need to join records that come from different > > sources but are linked to the same entity (record linkage). There is a > > deterministic rule-set to calculate (composed) "match-keys" for every > > incoming record that allow the correlation of records that are linked to > > the same entity. > > > > Example: There are events A (userid,first name,last name,....), > > B(username, location,.....) and C(location, weatcher-data,....). There is a > > set of rules in order to correlate A with B (A.firstName+A.lastName = > > B.username) and B with C (B.location = C.location). At the end, we want to > > get the whole graph of correlated records. > > > > Constraints: The latency of the records linkage should be as low as > > possible. The state stores should contain the messages of the last 180 days > > for linkage. (We are talking about tens to hundreds of GB of data) > > > > I already implemented a solution with spark + an external database. > > I calculate the match-keys and then store mappings for event-id => > > list-of-match-keys, match-key => list-of-event-ids and event-id => > > event-payload in the database. By querying the database one can get a graph > > of "event -> match-keys -> more events" and so on. I do the querying in a > > loop until there are no new events added. As a last step, I read the > > payloads using the accumulated event-ids. However, this solution has a high > > latency because of the external database calls. That’s why the idea of > > having KTables as local state stores sounds so interesting to me. > > > > Now with Kafka streams I would like to use the embedded state with > > KTables but I find it quite hard to come up with a solution. I think what I > > want to do is a self-join on the incoming topic which is not yet supported > > by the DSL. I thought of using the Processor API implementing a very > > similar solution to the one I described with spark: using several state > > stores for the mapping of event => match-keys, match-key => events. Beside > > the fact that I don't know how to address the partitioning (or whether I > > need a global store) I am not sure whether this is the way one would go > > with Kafka streams. > > > > Another solution I could think of is a loop in the topology so that > > an event would flow several times through the loop (which again has KTables > > for the mapping of event-id and match-key) until there are no new matches. > > Are loops possible at all and if so, is it a good approach or should one > > avoid loops? At the end of the record linkage process I’d like to have > > *one* message that contains the payloads of all correlated events and is > > then processed by the downstream processors. However I can only think of > > solutions where I need to do a flatMap() (do a join for every match-key) so > > that there is more than one message. > > > > Do you have any feedback or suggestions? Any examples that could > > help? > > > > Kind regards, > > > > Wladislaw > > > > >