Hi Henry,

1) Yes, if your key space is unlimited. But in practice, for KTable streams
where the record key (i.e. the primary key of the "table") is usually a
client-id, service-id, etc, the key space is usually bounded, for example
by the population of the globe, where in this case it should still be OK to
host with parallel Kafka Streams instances :)

2) It is currently based on the record event time. More specifically,
currently say you have a new Window instance created at T0 with maintenance
interval 10, then the first time we received a record with timestamp T10,
we will drop the window. I think this semantics can be improved to "stream
time", which is less vulnerable to early out-of-ordering records.


Do you want to create JIRAs for those issues I mentioned in the previous
emails to keep track?


Guozhang


On Tue, Apr 19, 2016 at 2:29 PM, Henry Cai <h...@pinterest.com.invalid>
wrote:

> I have another follow-up question on the compacted kafka topic for RocksDB
> replication.
>
> 1. From Kafka compaction implementation, looks like all keys from the past
> for that topic will be preserved, (the compaction/cleaner will only delete
> the records which has same-key occurrences later in the queue).  If that's
> the case, will we run out of disk space on kafka broker side for those
> compacted topics if we keep the stream application runs too long?
>
> 2. For the various windows stored in RocksDB, when we do trigger the
> removal/expiration of those window and keys from RocksDB?
>
>
> On Tue, Apr 19, 2016 at 12:27 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > 1) It sounds your should be using KTable.outerjoin(KTable) with your
> case,
> > but keep in mind that currently we are still working on exactly-once
> > semantics, and hence currently the results may be ordering dependent.
> >
> > We do not support windowing in KTable since itself is an ever-updating
> > changlog already, and hence its join result would also be a ever updating
> > changelog stream as KTable. Reading data from KTable where values with
> the
> > same key may not yet been compacted as fine, as long as the operation
> > itself is preserving :
> >
> > F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3}, {key:
> b,
> > value: 4}
> >
> > Here the resulted key values may be different, but the same key input
> will
> > generate the same key output. Then they are still changelog records for
> the
> > same key. All built-in KTable operators preserve this property. On the
> > other hand, if:
> >
> > F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3}, {key:
> c,
> > value: 4}
> >
> > The it is not key-preserving, and then you may encounter some unexpected
> > behavior.
> >
> >
> > 2) log compaction is a Kafka broker feature that Kafka Streams leverage
> on:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction
> >
> > It is done on disk files that are not active (i.e. no longer takes
> > appends).
> >
> > We are working on exposing the configs for log compactions such as
> > compaction intervals and thresholds in Kafka Streams so that users can
> > control its behavior. Actually, Henry do you mind creating a JIRA for
> this
> > purpose and list what you would like to control log compaction?
> >
> >
> > Guozhang
> >
> >
> >
> >
> >
> > On Tue, Apr 19, 2016 at 10:02 AM, Henry Cai <h...@pinterest.com.invalid>
> > wrote:
> >
> > > Related to the log compaction question: " it will be log
> > > compacted on the key over time", how do we control the time for log
> > > compaction?  For the log compaction implementation, is the storage used
> > to
> > > map a new value for a given key stored in memory or on disk?
> > >
> > > On Tue, Apr 19, 2016 at 8:58 AM, Guillermo Lammers Corral <
> > > guillermo.lammers.cor...@tecsisa.com> wrote:
> > >
> > > > Hello,
> > > >
> > > > Thanks again for your reply :)
> > > >
> > > > 1) In my example when I send a record from outer table and there is
> no
> > > > matching record from inner table I receive data to the output topic
> and
> > > > vice versa. I am trying it with the topics empties at the first
> > > execution.
> > > > How is possible?
> > > >
> > > > Why KTable joins does not support windowing strategies? I think that
> > for
> > > > this use cases I need it, what do you think?
> > > >
> > > > 2) What does it means? Although the log may not be yet compacted,
> there
> > > > should be no problem to read from them and execute a new stream
> > process,
> > > > right? (like a new joins, counts...).
> > > >
> > > > Thanks!!
> > > >
> > > > 2016-04-15 17:37 GMT+02:00 Guozhang Wang <wangg...@gmail.com>:
> > > >
> > > > > 1) There are three types of joins for KTable-KTable join, the
> follow
> > > the
> > > > > same semantics in SQL joins:
> > > > >
> > > > > KTable.join(KTable): when there is no matching record from inner
> > table
> > > > when
> > > > > received a new record from outer table, no output; and vice versa.
> > > > > KTable.leftjoin(KTable): when there is no matching record from
> inner
> > > > table
> > > > > when received a new record from outer table, output (a, null); on
> the
> > > > other
> > > > > direction no output.
> > > > > KTable.outerjoin(KTable): when there is no matching record from
> > inner /
> > > > > outer table when received a new record from outer / inner table,
> > output
> > > > (a,
> > > > > null) or (null, b).
> > > > >
> > > > >
> > > > > 2) The result topic is also a changelog topic, although it will be
> > log
> > > > > compacted on the key over time, if you consume immediately the log
> > may
> > > > not
> > > > > be yet compacted.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Fri, Apr 15, 2016 at 2:11 AM, Guillermo Lammers Corral <
> > > > > guillermo.lammers.cor...@tecsisa.com> wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > Thank you very much for your reply and sorry for the generic
> > > question,
> > > > > I'll
> > > > > > try to explain with some pseudocode.
> > > > > >
> > > > > > I have two KTable with a join:
> > > > > >
> > > > > > ktable1: KTable[String, String] = builder.table("topic1")
> > > > > > ktable2: KTable[String, String] = builder.table("topic2")
> > > > > >
> > > > > > result: KTable[String, ResultUnion] =
> > > > > > ktable1.join(ktable2, (data1, data2) => new ResultUnion(data1,
> > > data2))
> > > > > >
> > > > > > I send the result to a topic result.to("resultTopic").
> > > > > >
> > > > > > My questions are related with the following scenario:
> > > > > >
> > > > > > - The streming is up & running without data in topics
> > > > > >
> > > > > > - I send data to "topic2", for example a key/value like that
> > > > > ("uniqueKey1",
> > > > > > "hello")
> > > > > >
> > > > > > - I see null values in topic "resultTopic", i.e. ("uniqueKey1",
> > null)
> > > > > >
> > > > > > - If I send data to "topic1", for example a key/value like that
> > > > > > ("uniqueKey1", "world") then I see this values in topic
> > > "resultTopic",
> > > > > > ("uniqueKey1", ResultUnion("hello", "world"))
> > > > > >
> > > > > > Q: If we send data for one of the KTable that does not have the
> > > > > > corresponding data by key in the other one, obtain null values in
> > the
> > > > > > result final topic is the expected behavior?
> > > > > >
> > > > > > My next step would be use Kafka Connect to persist result data in
> > C*
> > > (I
> > > > > > have not read yet the Connector docs...), is this the way to do
> it?
> > > (I
> > > > > mean
> > > > > > prepare the data in the topic).
> > > > > >
> > > > > > Q: On the other hand, just to try, I have a KTable that read
> > messages
> > > > in
> > > > > > "resultTopic" and prints them. If the stream is a KTable I am
> > > wondering
> > > > > why
> > > > > > is getting all the values from the topic even those with the same
> > > key?
> > > > > >
> > > > > > Thanks in advance! Great job answering community!
> > > > > >
> > > > > > 2016-04-14 20:00 GMT+02:00 Guozhang Wang <wangg...@gmail.com>:
> > > > > >
> > > > > > > Hi Guillermo,
> > > > > > >
> > > > > > > 1) Yes in your case, the streams are really a "changelog"
> stream,
> > > > hence
> > > > > > you
> > > > > > > should create the stream as KTable, and do KTable-KTable join.
> > > > > > >
> > > > > > > 2) Could elaborate about "achieving this"? What behavior do
> > require
> > > > in
> > > > > > the
> > > > > > > application logic?
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Apr 14, 2016 at 1:30 AM, Guillermo Lammers Corral <
> > > > > > > guillermo.lammers.cor...@tecsisa.com> wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I am a newbie to Kafka Streams and I am using it trying to
> > solve
> > > a
> > > > > > > > particular use case. Let me explain.
> > > > > > > >
> > > > > > > > I have two sources of data both like that:
> > > > > > > >
> > > > > > > > Key (string)
> > > > > > > > DateTime (hourly granularity)
> > > > > > > > Value
> > > > > > > >
> > > > > > > > I need to join the two sources by key and date (hour of day)
> to
> > > > > obtain:
> > > > > > > >
> > > > > > > > Key (string)
> > > > > > > > DateTime (hourly granularity)
> > > > > > > > ValueSource1
> > > > > > > > ValueSource2
> > > > > > > >
> > > > > > > > I think that first I'd need to push the messages in Kafka
> > topics
> > > > with
> > > > > > the
> > > > > > > > date as part of the key because I'll group by key taking into
> > > > account
> > > > > > the
> > > > > > > > date. So maybe the key must be a new string like
> key_timestamp.
> > > > But,
> > > > > of
> > > > > > > > course, it is not the main problem, is just an additional
> > > > > explanation.
> > > > > > > >
> > > > > > > > Ok, so data are in topics, here we go!
> > > > > > > >
> > > > > > > > - Multiple records allows per key but only the latest value
> > for a
> > > > > > record
> > > > > > > > key will be considered. I should use two KTable with some
> join
> > > > > > strategy,
> > > > > > > > right?
> > > > > > > >
> > > > > > > > - Data of both sources could arrive at any time. What can I
> do
> > to
> > > > > > achieve
> > > > > > > > this?
> > > > > > > >
> > > > > > > > Thanks in advance.
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to