Hello Tommy, Which version of Kafka are you using?
Guozhang On Wed, Aug 31, 2016 at 4:41 AM, Tommy Q <deeplam...@gmail.com> wrote: > I cleaned up all the zookeeper & kafka states and run the WordCountDemo > again, the results in wc-out is still wrong: > > a 1 > > b 1 > > a 1 > > b 1 > > c 1 > > > > On Wed, Aug 31, 2016 at 5:32 PM, Michael Noll <mich...@confluent.io> > wrote: > > > Can you double-check whether the results in wc-out are not rather: > > > > a 1 > > b 1 > > a 2 > > b 2 > > c 1 > > > > ? > > > > On Wed, Aug 31, 2016 at 5:47 AM, Tommy Q <deeplam...@gmail.com> wrote: > > > > > Tried the word count example as discussed, the result in wc-out is > wrong: > > > > > > a 1 > > > > b 1 > > > > a 1 > > > > b 1 > > > > c 1 > > > > > > > > > The expected result should be: > > > > > > a 2 > > > > b 2 > > > > c 1 > > > > > > > > > Kafka version is 0.10.0.1 > > > > > > > > > On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax < > matth...@confluent.io > > > > > > wrote: > > > > > > > No. It does not support hidden topics. > > > > > > > > The only explanation might be, that there is no repartitioning step. > > But > > > > than the question would be, if there is a bug in Kafka Streams, > because > > > > between map() and countByKey() repartitioning is required. > > > > > > > > Can you verify that the result is correct? > > > > > > > > -Matthias > > > > > > > > On 08/30/2016 03:24 PM, Tommy Q wrote: > > > > > Does Kafka support hidden topics ? (Since all the topics infos are > > > stored > > > > > in ZK, this probably not the case ) > > > > > > > > > > On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax < > > > matth...@confluent.io> > > > > > wrote: > > > > > > > > > >> Hi Tommy, > > > > >> > > > > >> yes, you do understand Kafka Streams correctly. And yes, for > > > shuffling, > > > > >> na internal topic will be created under the hood. It should be > named > > > > >> "<application-id>-something-repartition". I am not sure, why it > is > > > not > > > > >> listed via bin/kafka-topics.sh > > > > >> > > > > >> The internal topic "<application-id>-counts-changelog" you see is > > > > >> created to back the state of countByKey() operator. > > > > >> > > > > >> See > > > > >> https://cwiki.apache.org/confluence/display/KAFKA/ > > > > >> Kafka+Streams%3A+Internal+Data+Management > > > > >> > > > > >> and > > > > >> > > > > >> http://www.confluent.io/blog/data-reprocessing-with-kafka- > > > > >> streams-resetting-a-streams-application > > > > >> > > > > >> > > > > >> -Matthias > > > > >> > > > > >> > > > > >> On 08/30/2016 06:55 AM, Tommy Q wrote: > > > > >>> Michael, Thanks for your help. > > > > >>> > > > > >>> Take the word count example, I am trying to walk through the code > > > based > > > > >> on > > > > >>> your explanation: > > > > >>> > > > > >>> val textLines: KStream[String, String] = > > > > >> builder.stream("input-topic") > > > > >>> val wordCounts: KStream[String, JLong] = textLines > > > > >>> .flatMapValues(_.toLowerCase.split("\\W+").toIterable. > > asJava) > > > > >>> .map((key: String, word: String) => new KeyValue(word, > word)) > > > > >>> .countByKey("counts") > > > > >>> .toStream > > > > >>> > > > > >>> wordCounts.to(stringSerde, longSerde, "wc-out") > > > > >>> > > > > >>> Suppose the input-topic has two partitions and each partition > has a > > > > >> string > > > > >>> record produced into: > > > > >>> > > > > >>> input-topic_0 : "a b" > > > > >>>> input-topic_1 : "a b c" > > > > >>> > > > > >>> > > > > >>> Suppose we started two instance of the stream topology ( task_0 > and > > > > >>> task_1). So after flatMapValues & map executed, they should have > > the > > > > >>> following task state: > > > > >>> > > > > >>> task_0 : [ (a, "a"), (b, "b") ] > > > > >>>> task_1 : [ (a, "a"), (b: "b"), (c: "c") ] > > > > >>> > > > > >>> > > > > >>> Before the execution of countByKey, the kafka-stream framework > > > should > > > > >>> insert a invisible shuffle phase internally: > > > > >>> > > > > >>> shuffled across the network : > > > > >>>> > > > > >>> > > > > >>> > > > > >>>> _internal_topic_shuffle_0 : [ (a, "a"), (a, "a") ] > > > > >>>> _internal_topic_shuffle_1 : [ (b, "b"), (b: "b"), (c: "c") ] > > > > >>> > > > > >>> > > > > >>> countByKey (reduce) : > > > > >>> > > > > >>> task_0 (counts-changelog_0) : [ (a, 2) ] > > > > >>> > > > > >>> task_1 (counts-changelog_1): [ (b, 2), (c, 1) ] > > > > >>> > > > > >>> > > > > >>> And after the execution of `wordCounts.to(stringSerde, longSerde, > > > > >>> "wc-out")`, we get the word count output in wc-out topic: > > > > >>> > > > > >>> task_0 (wc-out_0) : [ (a, 2) ] > > > > >>> > > > > >>> task_1 (wc-out_1): [ (b, 2), (c, 1) ] > > > > >>> > > > > >>> > > > > >>> > > > > >>> According the steps list above, do I understand the internals of > > > > kstream > > > > >>> word count correctly ? > > > > >>> Another question is does the shuffle across the network work by > > > > creating > > > > >>> intermediate topics ? If so, why can't I find the intermediate > > topics > > > > >> using > > > > >>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ? I can > > only > > > > see > > > > >>> the counts-changelog got created by the kstream framework. > > > > >>> > > > > >>> > > > > >>> > > > > >>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll < > > mich...@confluent.io> > > > > >> wrote: > > > > >>> > > > > >>>> In Kafka Streams, data is partitioned according to the keys of > the > > > > >>>> key-value records, and operations such as countByKey operate on > > > these > > > > >>>> stream partitions. When reading data from Kafka, these stream > > > > >> partitions > > > > >>>> map to the partitions of the Kafka input topic(s), but these may > > > > change > > > > >>>> once you add processing operations. > > > > >>>> > > > > >>>> To your question: The first step, if the data isn't already > keyed > > > as > > > > >>>> needed, is to select the key you want to count by, which results > > in > > > 1+ > > > > >>>> output stream partitions. Here, data may get shuffled across > the > > > > >> network > > > > >>>> (but if won't if there's no need to, e.g. when the data is > already > > > > >> keyed as > > > > >>>> needed). Then the count operation is performed for each stream > > > > >> partition, > > > > >>>> which is similar to the sort-and-reduce phase in Hadoop. > > > > >>>> > > > > >>>> On Mon, Aug 29, 2016 at 5:31 PM, Tommy <deeplam...@gmail.com> > > > wrote: > > > > >>>> > > > > >>>>> Hi, > > > > >>>>> > > > > >>>>> For "word count" example in Hadoop, there are > > > shuffle-sort-and-reduce > > > > >>>>> phases that handles outputs from different mappers, how does it > > > work > > > > in > > > > >>>>> KStream ? > > > > >>>>> > > > > >>>> > > > > >>> > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > > -- -- Guozhang