Does Kafka support hidden topics ? (Since all the topics infos are stored in ZK, this probably not the case )
On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Hi Tommy, > > yes, you do understand Kafka Streams correctly. And yes, for shuffling, > na internal topic will be created under the hood. It should be named > "<application-id>-something-repartition". I am not sure, why it is not > listed via bin/kafka-topics.sh > > The internal topic "<application-id>-counts-changelog" you see is > created to back the state of countByKey() operator. > > See > https://cwiki.apache.org/confluence/display/KAFKA/ > Kafka+Streams%3A+Internal+Data+Management > > and > > http://www.confluent.io/blog/data-reprocessing-with-kafka- > streams-resetting-a-streams-application > > > -Matthias > > > On 08/30/2016 06:55 AM, Tommy Q wrote: > > Michael, Thanks for your help. > > > > Take the word count example, I am trying to walk through the code based > on > > your explanation: > > > > val textLines: KStream[String, String] = > builder.stream("input-topic") > > val wordCounts: KStream[String, JLong] = textLines > > .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava) > > .map((key: String, word: String) => new KeyValue(word, word)) > > .countByKey("counts") > > .toStream > > > > wordCounts.to(stringSerde, longSerde, "wc-out") > > > > Suppose the input-topic has two partitions and each partition has a > string > > record produced into: > > > > input-topic_0 : "a b" > >> input-topic_1 : "a b c" > > > > > > Suppose we started two instance of the stream topology ( task_0 and > > task_1). So after flatMapValues & map executed, they should have the > > following task state: > > > > task_0 : [ (a, "a"), (b, "b") ] > >> task_1 : [ (a, "a"), (b: "b"), (c: "c") ] > > > > > > Before the execution of countByKey, the kafka-stream framework should > > insert a invisible shuffle phase internally: > > > > shuffled across the network : > >> > > > > > >> _internal_topic_shuffle_0 : [ (a, "a"), (a, "a") ] > >> _internal_topic_shuffle_1 : [ (b, "b"), (b: "b"), (c: "c") ] > > > > > > countByKey (reduce) : > > > > task_0 (counts-changelog_0) : [ (a, 2) ] > > > > task_1 (counts-changelog_1): [ (b, 2), (c, 1) ] > > > > > > And after the execution of `wordCounts.to(stringSerde, longSerde, > > "wc-out")`, we get the word count output in wc-out topic: > > > > task_0 (wc-out_0) : [ (a, 2) ] > > > > task_1 (wc-out_1): [ (b, 2), (c, 1) ] > > > > > > > > According the steps list above, do I understand the internals of kstream > > word count correctly ? > > Another question is does the shuffle across the network work by creating > > intermediate topics ? If so, why can't I find the intermediate topics > using > > `bin/kafka-topics.sh --list --zookeeper localhost:2181` ? I can only see > > the counts-changelog got created by the kstream framework. > > > > > > > > On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll <mich...@confluent.io> > wrote: > > > >> In Kafka Streams, data is partitioned according to the keys of the > >> key-value records, and operations such as countByKey operate on these > >> stream partitions. When reading data from Kafka, these stream > partitions > >> map to the partitions of the Kafka input topic(s), but these may change > >> once you add processing operations. > >> > >> To your question: The first step, if the data isn't already keyed as > >> needed, is to select the key you want to count by, which results in 1+ > >> output stream partitions. Here, data may get shuffled across the > network > >> (but if won't if there's no need to, e.g. when the data is already > keyed as > >> needed). Then the count operation is performed for each stream > partition, > >> which is similar to the sort-and-reduce phase in Hadoop. > >> > >> On Mon, Aug 29, 2016 at 5:31 PM, Tommy <deeplam...@gmail.com> wrote: > >> > >>> Hi, > >>> > >>> For "word count" example in Hadoop, there are shuffle-sort-and-reduce > >>> phases that handles outputs from different mappers, how does it work in > >>> KStream ? > >>> > >> > > > >