Thank you very much for all your responses, I have learned a lot.

Backs to the question I asked at the start of the thread regarding the
correct process of two datasets (~million data records) in which
corresponding entry in each KTable will be sent at any time, i.e., each one
could have been from days, weeks, months ago... I would like to know if
Kafka Streams is definitely a good choice for that scenarios in terms of
disk space, memory usage, or whether the key space is unbounded like Henry
said.

I'm preparing a PoC about that but I would like to know your point of view
after all.

Thanks Guozhang, Henry and Matthias.

2016-04-20 11:47 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:

> Log compaction can also delete keys if the payload for a key is null:
>
> "Compaction also allows from deletes. A message with a key and a null
> payload will be treated as a delete from the log. This delete marker
> will cause any prior message with that key to be removed (as would any
> new message with that key), but delete markers are special in they will
> themselves be cleaned out of the log after a period of time. The point
> in time at which deletes are no longer retained is given above."
>
> See: https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction
>
> Hope this helps.
>
> -Matthias
>
>
>
> On 04/20/2016 08:28 AM, Henry Cai wrote:
> > In my case, the key space is unbounded.  The key would be something like
> > 'ad_id', this id is auto incrementing all the time.  I understand the
> > benefit of using compacted kafka topic for aggregation store, but I don't
> > see much benefit of using compaction to replicate records in JoinWindow
> > (there are not many duplicates in that window).  Can we specify not to
> use
> > compaction for some state store replication?
> >
> > The window expiration policy on pure event time sounds risky, one
> > out-of-order record will drop still active windows.  We probably need a
> > policy to depend on both stream time and event time.
> >
> > I can fire JIRAs for these two.  For the issue of controlling compaction
> > time, I am not sure how to word the details, I will leave this up to you.
> >
> >
> > On Tue, Apr 19, 2016 at 6:19 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hi Henry,
> >>
> >> 1) Yes, if your key space is unlimited. But in practice, for KTable
> streams
> >> where the record key (i.e. the primary key of the "table") is usually a
> >> client-id, service-id, etc, the key space is usually bounded, for
> example
> >> by the population of the globe, where in this case it should still be
> OK to
> >> host with parallel Kafka Streams instances :)
> >>
> >> 2) It is currently based on the record event time. More specifically,
> >> currently say you have a new Window instance created at T0 with
> maintenance
> >> interval 10, then the first time we received a record with timestamp
> T10,
> >> we will drop the window. I think this semantics can be improved to
> "stream
> >> time", which is less vulnerable to early out-of-ordering records.
> >>
> >>
> >> Do you want to create JIRAs for those issues I mentioned in the previous
> >> emails to keep track?
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Tue, Apr 19, 2016 at 2:29 PM, Henry Cai <h...@pinterest.com.invalid>
> >> wrote:
> >>
> >>> I have another follow-up question on the compacted kafka topic for
> >> RocksDB
> >>> replication.
> >>>
> >>> 1. From Kafka compaction implementation, looks like all keys from the
> >> past
> >>> for that topic will be preserved, (the compaction/cleaner will only
> >> delete
> >>> the records which has same-key occurrences later in the queue).  If
> >> that's
> >>> the case, will we run out of disk space on kafka broker side for those
> >>> compacted topics if we keep the stream application runs too long?
> >>>
> >>> 2. For the various windows stored in RocksDB, when we do trigger the
> >>> removal/expiration of those window and keys from RocksDB?
> >>>
> >>>
> >>> On Tue, Apr 19, 2016 at 12:27 PM, Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>
> >>>> 1) It sounds your should be using KTable.outerjoin(KTable) with your
> >>> case,
> >>>> but keep in mind that currently we are still working on exactly-once
> >>>> semantics, and hence currently the results may be ordering dependent.
> >>>>
> >>>> We do not support windowing in KTable since itself is an ever-updating
> >>>> changlog already, and hence its join result would also be a ever
> >> updating
> >>>> changelog stream as KTable. Reading data from KTable where values with
> >>> the
> >>>> same key may not yet been compacted as fine, as long as the operation
> >>>> itself is preserving :
> >>>>
> >>>> F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3},
> >> {key:
> >>> b,
> >>>> value: 4}
> >>>>
> >>>> Here the resulted key values may be different, but the same key input
> >>> will
> >>>> generate the same key output. Then they are still changelog records
> for
> >>> the
> >>>> same key. All built-in KTable operators preserve this property. On the
> >>>> other hand, if:
> >>>>
> >>>> F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3},
> >> {key:
> >>> c,
> >>>> value: 4}
> >>>>
> >>>> The it is not key-preserving, and then you may encounter some
> >> unexpected
> >>>> behavior.
> >>>>
> >>>>
> >>>> 2) log compaction is a Kafka broker feature that Kafka Streams
> leverage
> >>> on:
> >>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction
> >>>>
> >>>> It is done on disk files that are not active (i.e. no longer takes
> >>>> appends).
> >>>>
> >>>> We are working on exposing the configs for log compactions such as
> >>>> compaction intervals and thresholds in Kafka Streams so that users can
> >>>> control its behavior. Actually, Henry do you mind creating a JIRA for
> >>> this
> >>>> purpose and list what you would like to control log compaction?
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Tue, Apr 19, 2016 at 10:02 AM, Henry Cai
> <h...@pinterest.com.invalid
> >>>
> >>>> wrote:
> >>>>
> >>>>> Related to the log compaction question: " it will be log
> >>>>> compacted on the key over time", how do we control the time for log
> >>>>> compaction?  For the log compaction implementation, is the storage
> >> used
> >>>> to
> >>>>> map a new value for a given key stored in memory or on disk?
> >>>>>
> >>>>> On Tue, Apr 19, 2016 at 8:58 AM, Guillermo Lammers Corral <
> >>>>> guillermo.lammers.cor...@tecsisa.com> wrote:
> >>>>>
> >>>>>> Hello,
> >>>>>>
> >>>>>> Thanks again for your reply :)
> >>>>>>
> >>>>>> 1) In my example when I send a record from outer table and there is
> >>> no
> >>>>>> matching record from inner table I receive data to the output topic
> >>> and
> >>>>>> vice versa. I am trying it with the topics empties at the first
> >>>>> execution.
> >>>>>> How is possible?
> >>>>>>
> >>>>>> Why KTable joins does not support windowing strategies? I think
> >> that
> >>>> for
> >>>>>> this use cases I need it, what do you think?
> >>>>>>
> >>>>>> 2) What does it means? Although the log may not be yet compacted,
> >>> there
> >>>>>> should be no problem to read from them and execute a new stream
> >>>> process,
> >>>>>> right? (like a new joins, counts...).
> >>>>>>
> >>>>>> Thanks!!
> >>>>>>
> >>>>>> 2016-04-15 17:37 GMT+02:00 Guozhang Wang <wangg...@gmail.com>:
> >>>>>>
> >>>>>>> 1) There are three types of joins for KTable-KTable join, the
> >>> follow
> >>>>> the
> >>>>>>> same semantics in SQL joins:
> >>>>>>>
> >>>>>>> KTable.join(KTable): when there is no matching record from inner
> >>>> table
> >>>>>> when
> >>>>>>> received a new record from outer table, no output; and vice
> >> versa.
> >>>>>>> KTable.leftjoin(KTable): when there is no matching record from
> >>> inner
> >>>>>> table
> >>>>>>> when received a new record from outer table, output (a, null); on
> >>> the
> >>>>>> other
> >>>>>>> direction no output.
> >>>>>>> KTable.outerjoin(KTable): when there is no matching record from
> >>>> inner /
> >>>>>>> outer table when received a new record from outer / inner table,
> >>>> output
> >>>>>> (a,
> >>>>>>> null) or (null, b).
> >>>>>>>
> >>>>>>>
> >>>>>>> 2) The result topic is also a changelog topic, although it will
> >> be
> >>>> log
> >>>>>>> compacted on the key over time, if you consume immediately the
> >> log
> >>>> may
> >>>>>> not
> >>>>>>> be yet compacted.
> >>>>>>>
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>> On Fri, Apr 15, 2016 at 2:11 AM, Guillermo Lammers Corral <
> >>>>>>> guillermo.lammers.cor...@tecsisa.com> wrote:
> >>>>>>>
> >>>>>>>> Hi Guozhang,
> >>>>>>>>
> >>>>>>>> Thank you very much for your reply and sorry for the generic
> >>>>> question,
> >>>>>>> I'll
> >>>>>>>> try to explain with some pseudocode.
> >>>>>>>>
> >>>>>>>> I have two KTable with a join:
> >>>>>>>>
> >>>>>>>> ktable1: KTable[String, String] = builder.table("topic1")
> >>>>>>>> ktable2: KTable[String, String] = builder.table("topic2")
> >>>>>>>>
> >>>>>>>> result: KTable[String, ResultUnion] =
> >>>>>>>> ktable1.join(ktable2, (data1, data2) => new ResultUnion(data1,
> >>>>> data2))
> >>>>>>>>
> >>>>>>>> I send the result to a topic result.to("resultTopic").
> >>>>>>>>
> >>>>>>>> My questions are related with the following scenario:
> >>>>>>>>
> >>>>>>>> - The streming is up & running without data in topics
> >>>>>>>>
> >>>>>>>> - I send data to "topic2", for example a key/value like that
> >>>>>>> ("uniqueKey1",
> >>>>>>>> "hello")
> >>>>>>>>
> >>>>>>>> - I see null values in topic "resultTopic", i.e. ("uniqueKey1",
> >>>> null)
> >>>>>>>>
> >>>>>>>> - If I send data to "topic1", for example a key/value like that
> >>>>>>>> ("uniqueKey1", "world") then I see this values in topic
> >>>>> "resultTopic",
> >>>>>>>> ("uniqueKey1", ResultUnion("hello", "world"))
> >>>>>>>>
> >>>>>>>> Q: If we send data for one of the KTable that does not have the
> >>>>>>>> corresponding data by key in the other one, obtain null values
> >> in
> >>>> the
> >>>>>>>> result final topic is the expected behavior?
> >>>>>>>>
> >>>>>>>> My next step would be use Kafka Connect to persist result data
> >> in
> >>>> C*
> >>>>> (I
> >>>>>>>> have not read yet the Connector docs...), is this the way to do
> >>> it?
> >>>>> (I
> >>>>>>> mean
> >>>>>>>> prepare the data in the topic).
> >>>>>>>>
> >>>>>>>> Q: On the other hand, just to try, I have a KTable that read
> >>>> messages
> >>>>>> in
> >>>>>>>> "resultTopic" and prints them. If the stream is a KTable I am
> >>>>> wondering
> >>>>>>> why
> >>>>>>>> is getting all the values from the topic even those with the
> >> same
> >>>>> key?
> >>>>>>>>
> >>>>>>>> Thanks in advance! Great job answering community!
> >>>>>>>>
> >>>>>>>> 2016-04-14 20:00 GMT+02:00 Guozhang Wang <wangg...@gmail.com>:
> >>>>>>>>
> >>>>>>>>> Hi Guillermo,
> >>>>>>>>>
> >>>>>>>>> 1) Yes in your case, the streams are really a "changelog"
> >>> stream,
> >>>>>> hence
> >>>>>>>> you
> >>>>>>>>> should create the stream as KTable, and do KTable-KTable
> >> join.
> >>>>>>>>>
> >>>>>>>>> 2) Could elaborate about "achieving this"? What behavior do
> >>>> require
> >>>>>> in
> >>>>>>>> the
> >>>>>>>>> application logic?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, Apr 14, 2016 at 1:30 AM, Guillermo Lammers Corral <
> >>>>>>>>> guillermo.lammers.cor...@tecsisa.com> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> I am a newbie to Kafka Streams and I am using it trying to
> >>>> solve
> >>>>> a
> >>>>>>>>>> particular use case. Let me explain.
> >>>>>>>>>>
> >>>>>>>>>> I have two sources of data both like that:
> >>>>>>>>>>
> >>>>>>>>>> Key (string)
> >>>>>>>>>> DateTime (hourly granularity)
> >>>>>>>>>> Value
> >>>>>>>>>>
> >>>>>>>>>> I need to join the two sources by key and date (hour of
> >> day)
> >>> to
> >>>>>>> obtain:
> >>>>>>>>>>
> >>>>>>>>>> Key (string)
> >>>>>>>>>> DateTime (hourly granularity)
> >>>>>>>>>> ValueSource1
> >>>>>>>>>> ValueSource2
> >>>>>>>>>>
> >>>>>>>>>> I think that first I'd need to push the messages in Kafka
> >>>> topics
> >>>>>> with
> >>>>>>>> the
> >>>>>>>>>> date as part of the key because I'll group by key taking
> >> into
> >>>>>> account
> >>>>>>>> the
> >>>>>>>>>> date. So maybe the key must be a new string like
> >>> key_timestamp.
> >>>>>> But,
> >>>>>>> of
> >>>>>>>>>> course, it is not the main problem, is just an additional
> >>>>>>> explanation.
> >>>>>>>>>>
> >>>>>>>>>> Ok, so data are in topics, here we go!
> >>>>>>>>>>
> >>>>>>>>>> - Multiple records allows per key but only the latest value
> >>>> for a
> >>>>>>>> record
> >>>>>>>>>> key will be considered. I should use two KTable with some
> >>> join
> >>>>>>>> strategy,
> >>>>>>>>>> right?
> >>>>>>>>>>
> >>>>>>>>>> - Data of both sources could arrive at any time. What can I
> >>> do
> >>>> to
> >>>>>>>> achieve
> >>>>>>>>>> this?
> >>>>>>>>>>
> >>>>>>>>>> Thanks in advance.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>
>

Reply via email to