Hi,

Kafka Stream is under heavy development, including better documentation
etc. Maybe the docs do not explain all internals yet. We also work in
improving the wiki documentation
(https://cwiki.apache.org/confluence/display/KAFKA/Index) -- the wiki
covers more of the internal design (some stuff might be out dated --
many stuff is still missing).

Having said this, thanks for you feedback -- it helps us do find gaps in
the documentation!
Furthermore, end-to-end semantics is also on the roadmap and I am sure,
there will be a blog post (no estimate when though).

To your questions (answers for 0.10.0.0 -- some things get changed in
next release):

1) internal topics are created for two reasons
   a) re-partitioning of data (for group-by; in next release also for joins)
   b) back-up topics (called "changelog" topics) for KTables (only if
created via an aggregation or join -- not if read via table(...) -- in
next release always) -- changelog topics are used to re-create RocksDB
store on recovery and for scaling purpose

So yes, there might be some/many internally created topics. Those topics
will never be deleted. If you do not need old internal topics any more
(because you do not run your application anymore), you need to delete
them manually.

2) internal topics are named like "<application-ID>-XXX-changelog" or
"<application-ID>-XXX-repartition" where <application-ID>---well, is
your application ID--- and XXX is the name of the operator (as specified
in your code) plus counting operator index (to avoid name collisions).

In theory, those topics could conflict with other topics -- but in
practice, the uses naming schema should avoid naming conflicts.
Especially, the application-id prefix avoid conflict with other Kafka
Streams applications.

For manual deletion, just search for topics starting with your
application-id.

3) You have no control over internal topics. However, you can avoid
repartitioning topics if you create those topic manually (and thus
control them completely) before you start your application, and do the
repartitioning explicitly in your code via
.through("your-repartioning-topic").

Ie. instead of

> builder.stream(...).map(/* we set a new key here */).groupBy()

you do

> builder.stream(...).map(/* we set a new key here */)..through(...).groupBy()

For the second case, Kafka Streams detects that the data is partitioned
correctly already, and thus, groupBy() does not create an internal
re-partitioning topic.


I hope I did cover all your question. If not, just follow up with more
question :)


-Matthias


On 07/21/2016 08:45 PM, Michael-Keith Bernard wrote:
> Hey Matthias,
> 
> Thanks for the quick reply! Unfortunately that's still a very unsatisfying 
> answer and I'm hoping you or someone else can shed a bit more light on the 
> internals here. First of all, I've read through the documentation (some parts 
> closer than others, so 100% possible I flat out missed it) and I don't recall 
> seeing any mentions of topics being implicitly created by stateful 
> operations. The docs talk at some length about the local state stores 
> necessary for aggregations and joins, but says little about 
> aggregating/joining data that has been re-keyed partway through the topology 
> (such as the word count example I linked in my original post). Furthermore, 
> if it really is creating topics implicitly, then does the user have any 
> control over the configuration of these topics? The number of partitions? The 
> lifespan of the topic? Does this mean that a user of Kafka Streams could 
> inadvertently create dozens of intermediate topics with only a moderately 
> complex topology? Which KStream/KTable operations create implicit topics? How 
> do the consumers coordinate naming the anonymous intermediate topics such 
> that they all use the same correct one without conflict with other topologies 
> running against the same cluster? Perhaps most importantly, where 
> specifically is all of this behavior documented (again, I fully admit I may 
> have just skimmed over it)?
>  
> I'm happy to go diving through code if the documentation for this simply 
> doesn't exist yet or is otherwise in flux, so some pointers on where to get 
> started would be greatly appreciated. Finally, towards the end of the 
> original Kafka Streams blog post, the author (Jay) mentions diving further 
> into the end-to-end semantics of Kafka Streams. Is that documentation/blog 
> post still coming? Is there anything I can read now about how at least once 
> delivery is guaranteed by Kafka Streams?
> 
> Cheers,
> Michael-Keith
> 
> 
> From: Matthias J. Sax <matth...@confluent.io>
> Sent: Thursday, July 21, 2016 7:31 AM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams: Merging of partial results
>     
> Hi,
> 
> you answered your question absolutely correctly by yourself (ie,
> internal topic creating in groupBy() to repartition the data on words).
> I cannot add more to explain how it works.
> 
> You might want to have a look here for more details about Kafka Streams
> in general:
> 
> http://docs.confluent.io/3.0.0/streams/index.html
> 
> 
> Kafka Streams — Confluent Platform 3.0.0 documentation
> docs.confluent.io
> Kafka Streams¶ This section describes Kafka Streams, a component of open 
> source Apache Kafka. Kafka Streams is a powerful, easy-to-use library for 
> building highly ...
> 
> 
> 
> -Matthias
> 
> 
> On 07/21/2016 04:16 PM, Michael-Keith Bernard wrote:
>> Hello Kafka Users,
>>
>> (I apologize if this landed in your inbox twice, I sent it yesterday 
>> afternoon but it never showed up in the archive so I'm sending again just in 
>> case.)
>>
>> I've been floating this question around the #apache-kafka IRC channel on 
>> Freenode for the last week or two and I still haven't reached a satisfying 
>> answer. The basic question is: How does Kafka Steams merge partial results? 
>> So let me expand on that a bit...
>>
>> Consider the following word count example in the official Kafka Streams repo 
>> (Github mirror):  
>> https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java#L46
>>
>> Now suppose we run this topology against a Kafka topic with 2 partitions. 
>> Into the first partition, we insert the sentence "hello world". Into the 
>> second partition, we insert the sentence "goodbye world". We know a priori 
>> that the result of this computation  is something like:
>>
>> { hello: 1, goodbye: 1, world: 2 } # made up syntax for a compacted 
>> log/KTable state
>>
>> And indeed we'd probably see precisely that result from a *single* consumer 
>> process that sees *all* the data. However, my question is, what if I have 1 
>> consumer per topic partition (2 consumers total in the above hypothetical)? 
>> Under that scenario, consumer  1 would emit { hello: 1, world: 1 } and 
>> consumer 2 would emit { goodbye: 1, world: 1 }... But the true answer 
>> requires and additional reduction of duplicate keys (in this case with a sum 
>> operator, but that needn't be the case for arbitrary aggregations).
>>
>> So again my question is, how are the partial results that each consumer 
>> generates merged into a final result? A simple way to accomplish this would 
>> be to produce an intermediate topic that is keyed by the word, then 
>> aggregate that (since each consumer would  see all the data for a given 
>> key), but if that's happening it's not explicit anywhere in the example. So 
>> what mechanism is Kafka Streams using internally to aggregate the results of 
>> a partitioned stream across multiple consumers? (Perhaps groupByKey creating 
>>  an anonymous intermediate topic?)
>>
>> I know that's a bit wordy, but I want to make sure my question is extremely 
>> clear. If I've still fumbled on that, let me know and I will try to be even 
>> more explicit. :)
>>
>> Cheers,
>> Michael-Keith Bernard
>>
>> P.S. Kafka is awesome and Kafka Streams look even awesome-er!
>>
> 
>     
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to