Thank you very much for all your responses, I have learned a lot. Backs to the question I asked at the start of the thread regarding the correct process of two datasets (~million data records) in which corresponding entry in each KTable will be sent at any time, i.e., each one could have been from days, weeks, months ago... I would like to know if Kafka Streams is definitely a good choice for that scenarios in terms of disk space, memory usage, or whether the key space is unbounded like Henry said.
I'm preparing a PoC about that but I would like to know your point of view after all. Thanks Guozhang, Henry and Matthias. 2016-04-20 11:47 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: > Log compaction can also delete keys if the payload for a key is null: > > "Compaction also allows from deletes. A message with a key and a null > payload will be treated as a delete from the log. This delete marker > will cause any prior message with that key to be removed (as would any > new message with that key), but delete markers are special in they will > themselves be cleaned out of the log after a period of time. The point > in time at which deletes are no longer retained is given above." > > See: https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction > > Hope this helps. > > -Matthias > > > > On 04/20/2016 08:28 AM, Henry Cai wrote: > > In my case, the key space is unbounded. The key would be something like > > 'ad_id', this id is auto incrementing all the time. I understand the > > benefit of using compacted kafka topic for aggregation store, but I don't > > see much benefit of using compaction to replicate records in JoinWindow > > (there are not many duplicates in that window). Can we specify not to > use > > compaction for some state store replication? > > > > The window expiration policy on pure event time sounds risky, one > > out-of-order record will drop still active windows. We probably need a > > policy to depend on both stream time and event time. > > > > I can fire JIRAs for these two. For the issue of controlling compaction > > time, I am not sure how to word the details, I will leave this up to you. > > > > > > On Tue, Apr 19, 2016 at 6:19 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Hi Henry, > >> > >> 1) Yes, if your key space is unlimited. But in practice, for KTable > streams > >> where the record key (i.e. the primary key of the "table") is usually a > >> client-id, service-id, etc, the key space is usually bounded, for > example > >> by the population of the globe, where in this case it should still be > OK to > >> host with parallel Kafka Streams instances :) > >> > >> 2) It is currently based on the record event time. More specifically, > >> currently say you have a new Window instance created at T0 with > maintenance > >> interval 10, then the first time we received a record with timestamp > T10, > >> we will drop the window. I think this semantics can be improved to > "stream > >> time", which is less vulnerable to early out-of-ordering records. > >> > >> > >> Do you want to create JIRAs for those issues I mentioned in the previous > >> emails to keep track? > >> > >> > >> Guozhang > >> > >> > >> On Tue, Apr 19, 2016 at 2:29 PM, Henry Cai <h...@pinterest.com.invalid> > >> wrote: > >> > >>> I have another follow-up question on the compacted kafka topic for > >> RocksDB > >>> replication. > >>> > >>> 1. From Kafka compaction implementation, looks like all keys from the > >> past > >>> for that topic will be preserved, (the compaction/cleaner will only > >> delete > >>> the records which has same-key occurrences later in the queue). If > >> that's > >>> the case, will we run out of disk space on kafka broker side for those > >>> compacted topics if we keep the stream application runs too long? > >>> > >>> 2. For the various windows stored in RocksDB, when we do trigger the > >>> removal/expiration of those window and keys from RocksDB? > >>> > >>> > >>> On Tue, Apr 19, 2016 at 12:27 PM, Guozhang Wang <wangg...@gmail.com> > >>> wrote: > >>> > >>>> 1) It sounds your should be using KTable.outerjoin(KTable) with your > >>> case, > >>>> but keep in mind that currently we are still working on exactly-once > >>>> semantics, and hence currently the results may be ordering dependent. > >>>> > >>>> We do not support windowing in KTable since itself is an ever-updating > >>>> changlog already, and hence its join result would also be a ever > >> updating > >>>> changelog stream as KTable. Reading data from KTable where values with > >>> the > >>>> same key may not yet been compacted as fine, as long as the operation > >>>> itself is preserving : > >>>> > >>>> F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3}, > >> {key: > >>> b, > >>>> value: 4} > >>>> > >>>> Here the resulted key values may be different, but the same key input > >>> will > >>>> generate the same key output. Then they are still changelog records > for > >>> the > >>>> same key. All built-in KTable operators preserve this property. On the > >>>> other hand, if: > >>>> > >>>> F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3}, > >> {key: > >>> c, > >>>> value: 4} > >>>> > >>>> The it is not key-preserving, and then you may encounter some > >> unexpected > >>>> behavior. > >>>> > >>>> > >>>> 2) log compaction is a Kafka broker feature that Kafka Streams > leverage > >>> on: > >>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction > >>>> > >>>> It is done on disk files that are not active (i.e. no longer takes > >>>> appends). > >>>> > >>>> We are working on exposing the configs for log compactions such as > >>>> compaction intervals and thresholds in Kafka Streams so that users can > >>>> control its behavior. Actually, Henry do you mind creating a JIRA for > >>> this > >>>> purpose and list what you would like to control log compaction? > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> On Tue, Apr 19, 2016 at 10:02 AM, Henry Cai > <h...@pinterest.com.invalid > >>> > >>>> wrote: > >>>> > >>>>> Related to the log compaction question: " it will be log > >>>>> compacted on the key over time", how do we control the time for log > >>>>> compaction? For the log compaction implementation, is the storage > >> used > >>>> to > >>>>> map a new value for a given key stored in memory or on disk? > >>>>> > >>>>> On Tue, Apr 19, 2016 at 8:58 AM, Guillermo Lammers Corral < > >>>>> guillermo.lammers.cor...@tecsisa.com> wrote: > >>>>> > >>>>>> Hello, > >>>>>> > >>>>>> Thanks again for your reply :) > >>>>>> > >>>>>> 1) In my example when I send a record from outer table and there is > >>> no > >>>>>> matching record from inner table I receive data to the output topic > >>> and > >>>>>> vice versa. I am trying it with the topics empties at the first > >>>>> execution. > >>>>>> How is possible? > >>>>>> > >>>>>> Why KTable joins does not support windowing strategies? I think > >> that > >>>> for > >>>>>> this use cases I need it, what do you think? > >>>>>> > >>>>>> 2) What does it means? Although the log may not be yet compacted, > >>> there > >>>>>> should be no problem to read from them and execute a new stream > >>>> process, > >>>>>> right? (like a new joins, counts...). > >>>>>> > >>>>>> Thanks!! > >>>>>> > >>>>>> 2016-04-15 17:37 GMT+02:00 Guozhang Wang <wangg...@gmail.com>: > >>>>>> > >>>>>>> 1) There are three types of joins for KTable-KTable join, the > >>> follow > >>>>> the > >>>>>>> same semantics in SQL joins: > >>>>>>> > >>>>>>> KTable.join(KTable): when there is no matching record from inner > >>>> table > >>>>>> when > >>>>>>> received a new record from outer table, no output; and vice > >> versa. > >>>>>>> KTable.leftjoin(KTable): when there is no matching record from > >>> inner > >>>>>> table > >>>>>>> when received a new record from outer table, output (a, null); on > >>> the > >>>>>> other > >>>>>>> direction no output. > >>>>>>> KTable.outerjoin(KTable): when there is no matching record from > >>>> inner / > >>>>>>> outer table when received a new record from outer / inner table, > >>>> output > >>>>>> (a, > >>>>>>> null) or (null, b). > >>>>>>> > >>>>>>> > >>>>>>> 2) The result topic is also a changelog topic, although it will > >> be > >>>> log > >>>>>>> compacted on the key over time, if you consume immediately the > >> log > >>>> may > >>>>>> not > >>>>>>> be yet compacted. > >>>>>>> > >>>>>>> > >>>>>>> Guozhang > >>>>>>> > >>>>>>> On Fri, Apr 15, 2016 at 2:11 AM, Guillermo Lammers Corral < > >>>>>>> guillermo.lammers.cor...@tecsisa.com> wrote: > >>>>>>> > >>>>>>>> Hi Guozhang, > >>>>>>>> > >>>>>>>> Thank you very much for your reply and sorry for the generic > >>>>> question, > >>>>>>> I'll > >>>>>>>> try to explain with some pseudocode. > >>>>>>>> > >>>>>>>> I have two KTable with a join: > >>>>>>>> > >>>>>>>> ktable1: KTable[String, String] = builder.table("topic1") > >>>>>>>> ktable2: KTable[String, String] = builder.table("topic2") > >>>>>>>> > >>>>>>>> result: KTable[String, ResultUnion] = > >>>>>>>> ktable1.join(ktable2, (data1, data2) => new ResultUnion(data1, > >>>>> data2)) > >>>>>>>> > >>>>>>>> I send the result to a topic result.to("resultTopic"). > >>>>>>>> > >>>>>>>> My questions are related with the following scenario: > >>>>>>>> > >>>>>>>> - The streming is up & running without data in topics > >>>>>>>> > >>>>>>>> - I send data to "topic2", for example a key/value like that > >>>>>>> ("uniqueKey1", > >>>>>>>> "hello") > >>>>>>>> > >>>>>>>> - I see null values in topic "resultTopic", i.e. ("uniqueKey1", > >>>> null) > >>>>>>>> > >>>>>>>> - If I send data to "topic1", for example a key/value like that > >>>>>>>> ("uniqueKey1", "world") then I see this values in topic > >>>>> "resultTopic", > >>>>>>>> ("uniqueKey1", ResultUnion("hello", "world")) > >>>>>>>> > >>>>>>>> Q: If we send data for one of the KTable that does not have the > >>>>>>>> corresponding data by key in the other one, obtain null values > >> in > >>>> the > >>>>>>>> result final topic is the expected behavior? > >>>>>>>> > >>>>>>>> My next step would be use Kafka Connect to persist result data > >> in > >>>> C* > >>>>> (I > >>>>>>>> have not read yet the Connector docs...), is this the way to do > >>> it? > >>>>> (I > >>>>>>> mean > >>>>>>>> prepare the data in the topic). > >>>>>>>> > >>>>>>>> Q: On the other hand, just to try, I have a KTable that read > >>>> messages > >>>>>> in > >>>>>>>> "resultTopic" and prints them. If the stream is a KTable I am > >>>>> wondering > >>>>>>> why > >>>>>>>> is getting all the values from the topic even those with the > >> same > >>>>> key? > >>>>>>>> > >>>>>>>> Thanks in advance! Great job answering community! > >>>>>>>> > >>>>>>>> 2016-04-14 20:00 GMT+02:00 Guozhang Wang <wangg...@gmail.com>: > >>>>>>>> > >>>>>>>>> Hi Guillermo, > >>>>>>>>> > >>>>>>>>> 1) Yes in your case, the streams are really a "changelog" > >>> stream, > >>>>>> hence > >>>>>>>> you > >>>>>>>>> should create the stream as KTable, and do KTable-KTable > >> join. > >>>>>>>>> > >>>>>>>>> 2) Could elaborate about "achieving this"? What behavior do > >>>> require > >>>>>> in > >>>>>>>> the > >>>>>>>>> application logic? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Guozhang > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Thu, Apr 14, 2016 at 1:30 AM, Guillermo Lammers Corral < > >>>>>>>>> guillermo.lammers.cor...@tecsisa.com> wrote: > >>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> > >>>>>>>>>> I am a newbie to Kafka Streams and I am using it trying to > >>>> solve > >>>>> a > >>>>>>>>>> particular use case. Let me explain. > >>>>>>>>>> > >>>>>>>>>> I have two sources of data both like that: > >>>>>>>>>> > >>>>>>>>>> Key (string) > >>>>>>>>>> DateTime (hourly granularity) > >>>>>>>>>> Value > >>>>>>>>>> > >>>>>>>>>> I need to join the two sources by key and date (hour of > >> day) > >>> to > >>>>>>> obtain: > >>>>>>>>>> > >>>>>>>>>> Key (string) > >>>>>>>>>> DateTime (hourly granularity) > >>>>>>>>>> ValueSource1 > >>>>>>>>>> ValueSource2 > >>>>>>>>>> > >>>>>>>>>> I think that first I'd need to push the messages in Kafka > >>>> topics > >>>>>> with > >>>>>>>> the > >>>>>>>>>> date as part of the key because I'll group by key taking > >> into > >>>>>> account > >>>>>>>> the > >>>>>>>>>> date. So maybe the key must be a new string like > >>> key_timestamp. > >>>>>> But, > >>>>>>> of > >>>>>>>>>> course, it is not the main problem, is just an additional > >>>>>>> explanation. > >>>>>>>>>> > >>>>>>>>>> Ok, so data are in topics, here we go! > >>>>>>>>>> > >>>>>>>>>> - Multiple records allows per key but only the latest value > >>>> for a > >>>>>>>> record > >>>>>>>>>> key will be considered. I should use two KTable with some > >>> join > >>>>>>>> strategy, > >>>>>>>>>> right? > >>>>>>>>>> > >>>>>>>>>> - Data of both sources could arrive at any time. What can I > >>> do > >>>> to > >>>>>>>> achieve > >>>>>>>>>> this? > >>>>>>>>>> > >>>>>>>>>> Thanks in advance. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> -- Guozhang > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> -- Guozhang > >>>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>> > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > >