1) There are three types of joins for KTable-KTable join, the follow the same semantics in SQL joins:
KTable.join(KTable): when there is no matching record from inner table when received a new record from outer table, no output; and vice versa. KTable.leftjoin(KTable): when there is no matching record from inner table when received a new record from outer table, output (a, null); on the other direction no output. KTable.outerjoin(KTable): when there is no matching record from inner / outer table when received a new record from outer / inner table, output (a, null) or (null, b). 2) The result topic is also a changelog topic, although it will be log compacted on the key over time, if you consume immediately the log may not be yet compacted. Guozhang On Fri, Apr 15, 2016 at 2:11 AM, Guillermo Lammers Corral < guillermo.lammers.cor...@tecsisa.com> wrote: > Hi Guozhang, > > Thank you very much for your reply and sorry for the generic question, I'll > try to explain with some pseudocode. > > I have two KTable with a join: > > ktable1: KTable[String, String] = builder.table("topic1") > ktable2: KTable[String, String] = builder.table("topic2") > > result: KTable[String, ResultUnion] = > ktable1.join(ktable2, (data1, data2) => new ResultUnion(data1, data2)) > > I send the result to a topic result.to("resultTopic"). > > My questions are related with the following scenario: > > - The streming is up & running without data in topics > > - I send data to "topic2", for example a key/value like that ("uniqueKey1", > "hello") > > - I see null values in topic "resultTopic", i.e. ("uniqueKey1", null) > > - If I send data to "topic1", for example a key/value like that > ("uniqueKey1", "world") then I see this values in topic "resultTopic", > ("uniqueKey1", ResultUnion("hello", "world")) > > Q: If we send data for one of the KTable that does not have the > corresponding data by key in the other one, obtain null values in the > result final topic is the expected behavior? > > My next step would be use Kafka Connect to persist result data in C* (I > have not read yet the Connector docs...), is this the way to do it? (I mean > prepare the data in the topic). > > Q: On the other hand, just to try, I have a KTable that read messages in > "resultTopic" and prints them. If the stream is a KTable I am wondering why > is getting all the values from the topic even those with the same key? > > Thanks in advance! Great job answering community! > > 2016-04-14 20:00 GMT+02:00 Guozhang Wang <wangg...@gmail.com>: > > > Hi Guillermo, > > > > 1) Yes in your case, the streams are really a "changelog" stream, hence > you > > should create the stream as KTable, and do KTable-KTable join. > > > > 2) Could elaborate about "achieving this"? What behavior do require in > the > > application logic? > > > > > > Guozhang > > > > > > On Thu, Apr 14, 2016 at 1:30 AM, Guillermo Lammers Corral < > > guillermo.lammers.cor...@tecsisa.com> wrote: > > > > > Hi, > > > > > > I am a newbie to Kafka Streams and I am using it trying to solve a > > > particular use case. Let me explain. > > > > > > I have two sources of data both like that: > > > > > > Key (string) > > > DateTime (hourly granularity) > > > Value > > > > > > I need to join the two sources by key and date (hour of day) to obtain: > > > > > > Key (string) > > > DateTime (hourly granularity) > > > ValueSource1 > > > ValueSource2 > > > > > > I think that first I'd need to push the messages in Kafka topics with > the > > > date as part of the key because I'll group by key taking into account > the > > > date. So maybe the key must be a new string like key_timestamp. But, of > > > course, it is not the main problem, is just an additional explanation. > > > > > > Ok, so data are in topics, here we go! > > > > > > - Multiple records allows per key but only the latest value for a > record > > > key will be considered. I should use two KTable with some join > strategy, > > > right? > > > > > > - Data of both sources could arrive at any time. What can I do to > achieve > > > this? > > > > > > Thanks in advance. > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang