Hi @Matthias & @Michael I pushed the WordCountDemo code to GitHub https://github.com/deeplambda/kstream-debug, can you help to debug the demo ?
Thanks, Tommy On Wed, Aug 31, 2016 at 11:47 AM, Tommy Q <deeplam...@gmail.com> wrote: > Tried the word count example as discussed, the result in wc-out is wrong: > > a 1 >> b 1 >> a 1 >> b 1 >> c 1 > > > The expected result should be: > > a 2 >> b 2 >> c 1 > > > Kafka version is 0.10.0.1 > > > On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> No. It does not support hidden topics. >> >> The only explanation might be, that there is no repartitioning step. But >> than the question would be, if there is a bug in Kafka Streams, because >> between map() and countByKey() repartitioning is required. >> >> Can you verify that the result is correct? >> >> -Matthias >> >> On 08/30/2016 03:24 PM, Tommy Q wrote: >> > Does Kafka support hidden topics ? (Since all the topics infos are >> stored >> > in ZK, this probably not the case ) >> > >> > On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <matth...@confluent.io >> > >> > wrote: >> > >> >> Hi Tommy, >> >> >> >> yes, you do understand Kafka Streams correctly. And yes, for shuffling, >> >> na internal topic will be created under the hood. It should be named >> >> "<application-id>-something-repartition". I am not sure, why it is not >> >> listed via bin/kafka-topics.sh >> >> >> >> The internal topic "<application-id>-counts-changelog" you see is >> >> created to back the state of countByKey() operator. >> >> >> >> See >> >> https://cwiki.apache.org/confluence/display/KAFKA/ >> >> Kafka+Streams%3A+Internal+Data+Management >> >> >> >> and >> >> >> >> http://www.confluent.io/blog/data-reprocessing-with-kafka- >> >> streams-resetting-a-streams-application >> >> >> >> >> >> -Matthias >> >> >> >> >> >> On 08/30/2016 06:55 AM, Tommy Q wrote: >> >>> Michael, Thanks for your help. >> >>> >> >>> Take the word count example, I am trying to walk through the code >> based >> >> on >> >>> your explanation: >> >>> >> >>> val textLines: KStream[String, String] = >> >> builder.stream("input-topic") >> >>> val wordCounts: KStream[String, JLong] = textLines >> >>> .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava) >> >>> .map((key: String, word: String) => new KeyValue(word, word)) >> >>> .countByKey("counts") >> >>> .toStream >> >>> >> >>> wordCounts.to(stringSerde, longSerde, "wc-out") >> >>> >> >>> Suppose the input-topic has two partitions and each partition has a >> >> string >> >>> record produced into: >> >>> >> >>> input-topic_0 : "a b" >> >>>> input-topic_1 : "a b c" >> >>> >> >>> >> >>> Suppose we started two instance of the stream topology ( task_0 and >> >>> task_1). So after flatMapValues & map executed, they should have the >> >>> following task state: >> >>> >> >>> task_0 : [ (a, "a"), (b, "b") ] >> >>>> task_1 : [ (a, "a"), (b: "b"), (c: "c") ] >> >>> >> >>> >> >>> Before the execution of countByKey, the kafka-stream framework should >> >>> insert a invisible shuffle phase internally: >> >>> >> >>> shuffled across the network : >> >>>> >> >>> >> >>> >> >>>> _internal_topic_shuffle_0 : [ (a, "a"), (a, "a") ] >> >>>> _internal_topic_shuffle_1 : [ (b, "b"), (b: "b"), (c: "c") ] >> >>> >> >>> >> >>> countByKey (reduce) : >> >>> >> >>> task_0 (counts-changelog_0) : [ (a, 2) ] >> >>> >> >>> task_1 (counts-changelog_1): [ (b, 2), (c, 1) ] >> >>> >> >>> >> >>> And after the execution of `wordCounts.to(stringSerde, longSerde, >> >>> "wc-out")`, we get the word count output in wc-out topic: >> >>> >> >>> task_0 (wc-out_0) : [ (a, 2) ] >> >>> >> >>> task_1 (wc-out_1): [ (b, 2), (c, 1) ] >> >>> >> >>> >> >>> >> >>> According the steps list above, do I understand the internals of >> kstream >> >>> word count correctly ? >> >>> Another question is does the shuffle across the network work by >> creating >> >>> intermediate topics ? If so, why can't I find the intermediate topics >> >> using >> >>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ? I can only >> see >> >>> the counts-changelog got created by the kstream framework. >> >>> >> >>> >> >>> >> >>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll <mich...@confluent.io> >> >> wrote: >> >>> >> >>>> In Kafka Streams, data is partitioned according to the keys of the >> >>>> key-value records, and operations such as countByKey operate on these >> >>>> stream partitions. When reading data from Kafka, these stream >> >> partitions >> >>>> map to the partitions of the Kafka input topic(s), but these may >> change >> >>>> once you add processing operations. >> >>>> >> >>>> To your question: The first step, if the data isn't already keyed as >> >>>> needed, is to select the key you want to count by, which results in >> 1+ >> >>>> output stream partitions. Here, data may get shuffled across the >> >> network >> >>>> (but if won't if there's no need to, e.g. when the data is already >> >> keyed as >> >>>> needed). Then the count operation is performed for each stream >> >> partition, >> >>>> which is similar to the sort-and-reduce phase in Hadoop. >> >>>> >> >>>> On Mon, Aug 29, 2016 at 5:31 PM, Tommy <deeplam...@gmail.com> wrote: >> >>>> >> >>>>> Hi, >> >>>>> >> >>>>> For "word count" example in Hadoop, there are >> shuffle-sort-and-reduce >> >>>>> phases that handles outputs from different mappers, how does it >> work in >> >>>>> KStream ? >> >>>>> >> >>>> >> >>> >> >> >> >> >> > >> >> >