1) It sounds your should be using KTable.outerjoin(KTable) with your case,
but keep in mind that currently we are still working on exactly-once
semantics, and hence currently the results may be ordering dependent.

We do not support windowing in KTable since itself is an ever-updating
changlog already, and hence its join result would also be a ever updating
changelog stream as KTable. Reading data from KTable where values with the
same key may not yet been compacted as fine, as long as the operation
itself is preserving :

F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3}, {key: b,
value: 4}

Here the resulted key values may be different, but the same key input will
generate the same key output. Then they are still changelog records for the
same key. All built-in KTable operators preserve this property. On the
other hand, if:

F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3}, {key: c,
value: 4}

The it is not key-preserving, and then you may encounter some unexpected
behavior.


2) log compaction is a Kafka broker feature that Kafka Streams leverage on:

https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction

It is done on disk files that are not active (i.e. no longer takes appends).

We are working on exposing the configs for log compactions such as
compaction intervals and thresholds in Kafka Streams so that users can
control its behavior. Actually, Henry do you mind creating a JIRA for this
purpose and list what you would like to control log compaction?


Guozhang





On Tue, Apr 19, 2016 at 10:02 AM, Henry Cai <h...@pinterest.com.invalid>
wrote:

> Related to the log compaction question: " it will be log
> compacted on the key over time", how do we control the time for log
> compaction?  For the log compaction implementation, is the storage used to
> map a new value for a given key stored in memory or on disk?
>
> On Tue, Apr 19, 2016 at 8:58 AM, Guillermo Lammers Corral <
> guillermo.lammers.cor...@tecsisa.com> wrote:
>
> > Hello,
> >
> > Thanks again for your reply :)
> >
> > 1) In my example when I send a record from outer table and there is no
> > matching record from inner table I receive data to the output topic and
> > vice versa. I am trying it with the topics empties at the first
> execution.
> > How is possible?
> >
> > Why KTable joins does not support windowing strategies? I think that for
> > this use cases I need it, what do you think?
> >
> > 2) What does it means? Although the log may not be yet compacted, there
> > should be no problem to read from them and execute a new stream process,
> > right? (like a new joins, counts...).
> >
> > Thanks!!
> >
> > 2016-04-15 17:37 GMT+02:00 Guozhang Wang <wangg...@gmail.com>:
> >
> > > 1) There are three types of joins for KTable-KTable join, the follow
> the
> > > same semantics in SQL joins:
> > >
> > > KTable.join(KTable): when there is no matching record from inner table
> > when
> > > received a new record from outer table, no output; and vice versa.
> > > KTable.leftjoin(KTable): when there is no matching record from inner
> > table
> > > when received a new record from outer table, output (a, null); on the
> > other
> > > direction no output.
> > > KTable.outerjoin(KTable): when there is no matching record from inner /
> > > outer table when received a new record from outer / inner table, output
> > (a,
> > > null) or (null, b).
> > >
> > >
> > > 2) The result topic is also a changelog topic, although it will be log
> > > compacted on the key over time, if you consume immediately the log may
> > not
> > > be yet compacted.
> > >
> > >
> > > Guozhang
> > >
> > > On Fri, Apr 15, 2016 at 2:11 AM, Guillermo Lammers Corral <
> > > guillermo.lammers.cor...@tecsisa.com> wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Thank you very much for your reply and sorry for the generic
> question,
> > > I'll
> > > > try to explain with some pseudocode.
> > > >
> > > > I have two KTable with a join:
> > > >
> > > > ktable1: KTable[String, String] = builder.table("topic1")
> > > > ktable2: KTable[String, String] = builder.table("topic2")
> > > >
> > > > result: KTable[String, ResultUnion] =
> > > > ktable1.join(ktable2, (data1, data2) => new ResultUnion(data1,
> data2))
> > > >
> > > > I send the result to a topic result.to("resultTopic").
> > > >
> > > > My questions are related with the following scenario:
> > > >
> > > > - The streming is up & running without data in topics
> > > >
> > > > - I send data to "topic2", for example a key/value like that
> > > ("uniqueKey1",
> > > > "hello")
> > > >
> > > > - I see null values in topic "resultTopic", i.e. ("uniqueKey1", null)
> > > >
> > > > - If I send data to "topic1", for example a key/value like that
> > > > ("uniqueKey1", "world") then I see this values in topic
> "resultTopic",
> > > > ("uniqueKey1", ResultUnion("hello", "world"))
> > > >
> > > > Q: If we send data for one of the KTable that does not have the
> > > > corresponding data by key in the other one, obtain null values in the
> > > > result final topic is the expected behavior?
> > > >
> > > > My next step would be use Kafka Connect to persist result data in C*
> (I
> > > > have not read yet the Connector docs...), is this the way to do it?
> (I
> > > mean
> > > > prepare the data in the topic).
> > > >
> > > > Q: On the other hand, just to try, I have a KTable that read messages
> > in
> > > > "resultTopic" and prints them. If the stream is a KTable I am
> wondering
> > > why
> > > > is getting all the values from the topic even those with the same
> key?
> > > >
> > > > Thanks in advance! Great job answering community!
> > > >
> > > > 2016-04-14 20:00 GMT+02:00 Guozhang Wang <wangg...@gmail.com>:
> > > >
> > > > > Hi Guillermo,
> > > > >
> > > > > 1) Yes in your case, the streams are really a "changelog" stream,
> > hence
> > > > you
> > > > > should create the stream as KTable, and do KTable-KTable join.
> > > > >
> > > > > 2) Could elaborate about "achieving this"? What behavior do require
> > in
> > > > the
> > > > > application logic?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Thu, Apr 14, 2016 at 1:30 AM, Guillermo Lammers Corral <
> > > > > guillermo.lammers.cor...@tecsisa.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am a newbie to Kafka Streams and I am using it trying to solve
> a
> > > > > > particular use case. Let me explain.
> > > > > >
> > > > > > I have two sources of data both like that:
> > > > > >
> > > > > > Key (string)
> > > > > > DateTime (hourly granularity)
> > > > > > Value
> > > > > >
> > > > > > I need to join the two sources by key and date (hour of day) to
> > > obtain:
> > > > > >
> > > > > > Key (string)
> > > > > > DateTime (hourly granularity)
> > > > > > ValueSource1
> > > > > > ValueSource2
> > > > > >
> > > > > > I think that first I'd need to push the messages in Kafka topics
> > with
> > > > the
> > > > > > date as part of the key because I'll group by key taking into
> > account
> > > > the
> > > > > > date. So maybe the key must be a new string like key_timestamp.
> > But,
> > > of
> > > > > > course, it is not the main problem, is just an additional
> > > explanation.
> > > > > >
> > > > > > Ok, so data are in topics, here we go!
> > > > > >
> > > > > > - Multiple records allows per key but only the latest value for a
> > > > record
> > > > > > key will be considered. I should use two KTable with some join
> > > > strategy,
> > > > > > right?
> > > > > >
> > > > > > - Data of both sources could arrive at any time. What can I do to
> > > > achieve
> > > > > > this?
> > > > > >
> > > > > > Thanks in advance.
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>



-- 
-- Guozhang

Reply via email to