Hi Henry, 1) Yes, if your key space is unlimited. But in practice, for KTable streams where the record key (i.e. the primary key of the "table") is usually a client-id, service-id, etc, the key space is usually bounded, for example by the population of the globe, where in this case it should still be OK to host with parallel Kafka Streams instances :)
2) It is currently based on the record event time. More specifically, currently say you have a new Window instance created at T0 with maintenance interval 10, then the first time we received a record with timestamp T10, we will drop the window. I think this semantics can be improved to "stream time", which is less vulnerable to early out-of-ordering records. Do you want to create JIRAs for those issues I mentioned in the previous emails to keep track? Guozhang On Tue, Apr 19, 2016 at 2:29 PM, Henry Cai <h...@pinterest.com.invalid> wrote: > I have another follow-up question on the compacted kafka topic for RocksDB > replication. > > 1. From Kafka compaction implementation, looks like all keys from the past > for that topic will be preserved, (the compaction/cleaner will only delete > the records which has same-key occurrences later in the queue). If that's > the case, will we run out of disk space on kafka broker side for those > compacted topics if we keep the stream application runs too long? > > 2. For the various windows stored in RocksDB, when we do trigger the > removal/expiration of those window and keys from RocksDB? > > > On Tue, Apr 19, 2016 at 12:27 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > 1) It sounds your should be using KTable.outerjoin(KTable) with your > case, > > but keep in mind that currently we are still working on exactly-once > > semantics, and hence currently the results may be ordering dependent. > > > > We do not support windowing in KTable since itself is an ever-updating > > changlog already, and hence its join result would also be a ever updating > > changelog stream as KTable. Reading data from KTable where values with > the > > same key may not yet been compacted as fine, as long as the operation > > itself is preserving : > > > > F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3}, {key: > b, > > value: 4} > > > > Here the resulted key values may be different, but the same key input > will > > generate the same key output. Then they are still changelog records for > the > > same key. All built-in KTable operators preserve this property. On the > > other hand, if: > > > > F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3}, {key: > c, > > value: 4} > > > > The it is not key-preserving, and then you may encounter some unexpected > > behavior. > > > > > > 2) log compaction is a Kafka broker feature that Kafka Streams leverage > on: > > > > https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction > > > > It is done on disk files that are not active (i.e. no longer takes > > appends). > > > > We are working on exposing the configs for log compactions such as > > compaction intervals and thresholds in Kafka Streams so that users can > > control its behavior. Actually, Henry do you mind creating a JIRA for > this > > purpose and list what you would like to control log compaction? > > > > > > Guozhang > > > > > > > > > > > > On Tue, Apr 19, 2016 at 10:02 AM, Henry Cai <h...@pinterest.com.invalid> > > wrote: > > > > > Related to the log compaction question: " it will be log > > > compacted on the key over time", how do we control the time for log > > > compaction? For the log compaction implementation, is the storage used > > to > > > map a new value for a given key stored in memory or on disk? > > > > > > On Tue, Apr 19, 2016 at 8:58 AM, Guillermo Lammers Corral < > > > guillermo.lammers.cor...@tecsisa.com> wrote: > > > > > > > Hello, > > > > > > > > Thanks again for your reply :) > > > > > > > > 1) In my example when I send a record from outer table and there is > no > > > > matching record from inner table I receive data to the output topic > and > > > > vice versa. I am trying it with the topics empties at the first > > > execution. > > > > How is possible? > > > > > > > > Why KTable joins does not support windowing strategies? I think that > > for > > > > this use cases I need it, what do you think? > > > > > > > > 2) What does it means? Although the log may not be yet compacted, > there > > > > should be no problem to read from them and execute a new stream > > process, > > > > right? (like a new joins, counts...). > > > > > > > > Thanks!! > > > > > > > > 2016-04-15 17:37 GMT+02:00 Guozhang Wang <wangg...@gmail.com>: > > > > > > > > > 1) There are three types of joins for KTable-KTable join, the > follow > > > the > > > > > same semantics in SQL joins: > > > > > > > > > > KTable.join(KTable): when there is no matching record from inner > > table > > > > when > > > > > received a new record from outer table, no output; and vice versa. > > > > > KTable.leftjoin(KTable): when there is no matching record from > inner > > > > table > > > > > when received a new record from outer table, output (a, null); on > the > > > > other > > > > > direction no output. > > > > > KTable.outerjoin(KTable): when there is no matching record from > > inner / > > > > > outer table when received a new record from outer / inner table, > > output > > > > (a, > > > > > null) or (null, b). > > > > > > > > > > > > > > > 2) The result topic is also a changelog topic, although it will be > > log > > > > > compacted on the key over time, if you consume immediately the log > > may > > > > not > > > > > be yet compacted. > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Fri, Apr 15, 2016 at 2:11 AM, Guillermo Lammers Corral < > > > > > guillermo.lammers.cor...@tecsisa.com> wrote: > > > > > > > > > > > Hi Guozhang, > > > > > > > > > > > > Thank you very much for your reply and sorry for the generic > > > question, > > > > > I'll > > > > > > try to explain with some pseudocode. > > > > > > > > > > > > I have two KTable with a join: > > > > > > > > > > > > ktable1: KTable[String, String] = builder.table("topic1") > > > > > > ktable2: KTable[String, String] = builder.table("topic2") > > > > > > > > > > > > result: KTable[String, ResultUnion] = > > > > > > ktable1.join(ktable2, (data1, data2) => new ResultUnion(data1, > > > data2)) > > > > > > > > > > > > I send the result to a topic result.to("resultTopic"). > > > > > > > > > > > > My questions are related with the following scenario: > > > > > > > > > > > > - The streming is up & running without data in topics > > > > > > > > > > > > - I send data to "topic2", for example a key/value like that > > > > > ("uniqueKey1", > > > > > > "hello") > > > > > > > > > > > > - I see null values in topic "resultTopic", i.e. ("uniqueKey1", > > null) > > > > > > > > > > > > - If I send data to "topic1", for example a key/value like that > > > > > > ("uniqueKey1", "world") then I see this values in topic > > > "resultTopic", > > > > > > ("uniqueKey1", ResultUnion("hello", "world")) > > > > > > > > > > > > Q: If we send data for one of the KTable that does not have the > > > > > > corresponding data by key in the other one, obtain null values in > > the > > > > > > result final topic is the expected behavior? > > > > > > > > > > > > My next step would be use Kafka Connect to persist result data in > > C* > > > (I > > > > > > have not read yet the Connector docs...), is this the way to do > it? > > > (I > > > > > mean > > > > > > prepare the data in the topic). > > > > > > > > > > > > Q: On the other hand, just to try, I have a KTable that read > > messages > > > > in > > > > > > "resultTopic" and prints them. If the stream is a KTable I am > > > wondering > > > > > why > > > > > > is getting all the values from the topic even those with the same > > > key? > > > > > > > > > > > > Thanks in advance! Great job answering community! > > > > > > > > > > > > 2016-04-14 20:00 GMT+02:00 Guozhang Wang <wangg...@gmail.com>: > > > > > > > > > > > > > Hi Guillermo, > > > > > > > > > > > > > > 1) Yes in your case, the streams are really a "changelog" > stream, > > > > hence > > > > > > you > > > > > > > should create the stream as KTable, and do KTable-KTable join. > > > > > > > > > > > > > > 2) Could elaborate about "achieving this"? What behavior do > > require > > > > in > > > > > > the > > > > > > > application logic? > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > On Thu, Apr 14, 2016 at 1:30 AM, Guillermo Lammers Corral < > > > > > > > guillermo.lammers.cor...@tecsisa.com> wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > I am a newbie to Kafka Streams and I am using it trying to > > solve > > > a > > > > > > > > particular use case. Let me explain. > > > > > > > > > > > > > > > > I have two sources of data both like that: > > > > > > > > > > > > > > > > Key (string) > > > > > > > > DateTime (hourly granularity) > > > > > > > > Value > > > > > > > > > > > > > > > > I need to join the two sources by key and date (hour of day) > to > > > > > obtain: > > > > > > > > > > > > > > > > Key (string) > > > > > > > > DateTime (hourly granularity) > > > > > > > > ValueSource1 > > > > > > > > ValueSource2 > > > > > > > > > > > > > > > > I think that first I'd need to push the messages in Kafka > > topics > > > > with > > > > > > the > > > > > > > > date as part of the key because I'll group by key taking into > > > > account > > > > > > the > > > > > > > > date. So maybe the key must be a new string like > key_timestamp. > > > > But, > > > > > of > > > > > > > > course, it is not the main problem, is just an additional > > > > > explanation. > > > > > > > > > > > > > > > > Ok, so data are in topics, here we go! > > > > > > > > > > > > > > > > - Multiple records allows per key but only the latest value > > for a > > > > > > record > > > > > > > > key will be considered. I should use two KTable with some > join > > > > > > strategy, > > > > > > > > right? > > > > > > > > > > > > > > > > - Data of both sources could arrive at any time. What can I > do > > to > > > > > > achieve > > > > > > > > this? > > > > > > > > > > > > > > > > Thanks in advance. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang