FYI: We updated the 0.10.0.x demos for Kafka Streams at https://github.com/confluentinc/examples to use #partitions >1 and include `through()`.
See for example [1]. Hope this helps! Michael [1] https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java On Thu, Sep 1, 2016 at 10:50 AM, Tommy Q <deeplam...@gmail.com> wrote: > It works after calling through() before countByKey, so many 0.10.0.1 > examples on the web missing the `through()` call and it will fail to get > the right output when running with input topic > 1 partitions. > > Thanks very much all ! Finally got the correct results. > > On Thu, Sep 1, 2016 at 4:52 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > Hi Tommy, > > > > I did checkout your github project and can verify the "issue". As you > > are using Kafka 0.10.0.1 the automatic repartitioning step is not > > available. > > > > If you use "trunk" version, your program will run as expected. If you > > want to stay with 0.10.0.1, you need to repartition the data after map() > > explicitly, via a call to through(): > > > > > val wordCounts: KStream[String, JLong] = textLines > > > .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava) > > > .map((key: String, word: String) => new KeyValue(word, word)) > > > .through("my-repartitioing-topic") > > > .countByKey("counts") > > > .toStream > > > > Keep in mind, that it is recommended to create all user topics manually. > > Thus, you should create your repartitioning topic you specify in > > through() before you start your Kafka Streams application. > > > > > > -Matthias > > > > > > On 08/31/2016 09:07 PM, Guozhang Wang wrote: > > > Hello Tommy, > > > > > > Which version of Kafka are you using? > > > > > > Guozhang > > > > > > On Wed, Aug 31, 2016 at 4:41 AM, Tommy Q <deeplam...@gmail.com> wrote: > > > > > >> I cleaned up all the zookeeper & kafka states and run the > WordCountDemo > > >> again, the results in wc-out is still wrong: > > >> > > >> a 1 > > >>> b 1 > > >>> a 1 > > >>> b 1 > > >>> c 1 > > >> > > >> > > >> > > >> On Wed, Aug 31, 2016 at 5:32 PM, Michael Noll <mich...@confluent.io> > > >> wrote: > > >> > > >>> Can you double-check whether the results in wc-out are not rather: > > >>> > > >>> a 1 > > >>> b 1 > > >>> a 2 > > >>> b 2 > > >>> c 1 > > >>> > > >>> ? > > >>> > > >>> On Wed, Aug 31, 2016 at 5:47 AM, Tommy Q <deeplam...@gmail.com> > wrote: > > >>> > > >>>> Tried the word count example as discussed, the result in wc-out is > > >> wrong: > > >>>> > > >>>> a 1 > > >>>>> b 1 > > >>>>> a 1 > > >>>>> b 1 > > >>>>> c 1 > > >>>> > > >>>> > > >>>> The expected result should be: > > >>>> > > >>>> a 2 > > >>>>> b 2 > > >>>>> c 1 > > >>>> > > >>>> > > >>>> Kafka version is 0.10.0.1 > > >>>> > > >>>> > > >>>> On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax < > > >> matth...@confluent.io > > >>>> > > >>>> wrote: > > >>>> > > >>>>> No. It does not support hidden topics. > > >>>>> > > >>>>> The only explanation might be, that there is no repartitioning > step. > > >>> But > > >>>>> than the question would be, if there is a bug in Kafka Streams, > > >> because > > >>>>> between map() and countByKey() repartitioning is required. > > >>>>> > > >>>>> Can you verify that the result is correct? > > >>>>> > > >>>>> -Matthias > > >>>>> > > >>>>> On 08/30/2016 03:24 PM, Tommy Q wrote: > > >>>>>> Does Kafka support hidden topics ? (Since all the topics infos are > > >>>> stored > > >>>>>> in ZK, this probably not the case ) > > >>>>>> > > >>>>>> On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax < > > >>>> matth...@confluent.io> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Hi Tommy, > > >>>>>>> > > >>>>>>> yes, you do understand Kafka Streams correctly. And yes, for > > >>>> shuffling, > > >>>>>>> na internal topic will be created under the hood. It should be > > >> named > > >>>>>>> "<application-id>-something-repartition". I am not sure, why it > > >> is > > >>>> not > > >>>>>>> listed via bin/kafka-topics.sh > > >>>>>>> > > >>>>>>> The internal topic "<application-id>-counts-changelog" you see > is > > >>>>>>> created to back the state of countByKey() operator. > > >>>>>>> > > >>>>>>> See > > >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/ > > >>>>>>> Kafka+Streams%3A+Internal+Data+Management > > >>>>>>> > > >>>>>>> and > > >>>>>>> > > >>>>>>> http://www.confluent.io/blog/data-reprocessing-with-kafka- > > >>>>>>> streams-resetting-a-streams-application > > >>>>>>> > > >>>>>>> > > >>>>>>> -Matthias > > >>>>>>> > > >>>>>>> > > >>>>>>> On 08/30/2016 06:55 AM, Tommy Q wrote: > > >>>>>>>> Michael, Thanks for your help. > > >>>>>>>> > > >>>>>>>> Take the word count example, I am trying to walk through the > code > > >>>> based > > >>>>>>> on > > >>>>>>>> your explanation: > > >>>>>>>> > > >>>>>>>> val textLines: KStream[String, String] = > > >>>>>>> builder.stream("input-topic") > > >>>>>>>> val wordCounts: KStream[String, JLong] = textLines > > >>>>>>>> .flatMapValues(_.toLowerCase.split("\\W+").toIterable. > > >>> asJava) > > >>>>>>>> .map((key: String, word: String) => new KeyValue(word, > > >> word)) > > >>>>>>>> .countByKey("counts") > > >>>>>>>> .toStream > > >>>>>>>> > > >>>>>>>> wordCounts.to(stringSerde, longSerde, "wc-out") > > >>>>>>>> > > >>>>>>>> Suppose the input-topic has two partitions and each partition > > >> has a > > >>>>>>> string > > >>>>>>>> record produced into: > > >>>>>>>> > > >>>>>>>> input-topic_0 : "a b" > > >>>>>>>>> input-topic_1 : "a b c" > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Suppose we started two instance of the stream topology ( task_0 > > >> and > > >>>>>>>> task_1). So after flatMapValues & map executed, they should have > > >>> the > > >>>>>>>> following task state: > > >>>>>>>> > > >>>>>>>> task_0 : [ (a, "a"), (b, "b") ] > > >>>>>>>>> task_1 : [ (a, "a"), (b: "b"), (c: "c") ] > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Before the execution of countByKey, the kafka-stream framework > > >>>> should > > >>>>>>>> insert a invisible shuffle phase internally: > > >>>>>>>> > > >>>>>>>> shuffled across the network : > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> _internal_topic_shuffle_0 : [ (a, "a"), (a, "a") ] > > >>>>>>>>> _internal_topic_shuffle_1 : [ (b, "b"), (b: "b"), (c: "c") ] > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> countByKey (reduce) : > > >>>>>>>> > > >>>>>>>> task_0 (counts-changelog_0) : [ (a, 2) ] > > >>>>>>>> > > >>>>>>>> task_1 (counts-changelog_1): [ (b, 2), (c, 1) ] > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> And after the execution of `wordCounts.to(stringSerde, > longSerde, > > >>>>>>>> "wc-out")`, we get the word count output in wc-out topic: > > >>>>>>>> > > >>>>>>>> task_0 (wc-out_0) : [ (a, 2) ] > > >>>>>>>> > > >>>>>>>> task_1 (wc-out_1): [ (b, 2), (c, 1) ] > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> According the steps list above, do I understand the internals of > > >>>>> kstream > > >>>>>>>> word count correctly ? > > >>>>>>>> Another question is does the shuffle across the network work by > > >>>>> creating > > >>>>>>>> intermediate topics ? If so, why can't I find the intermediate > > >>> topics > > >>>>>>> using > > >>>>>>>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ? I can > > >>> only > > >>>>> see > > >>>>>>>> the counts-changelog got created by the kstream framework. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll < > > >>> mich...@confluent.io> > > >>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> In Kafka Streams, data is partitioned according to the keys of > > >> the > > >>>>>>>>> key-value records, and operations such as countByKey operate on > > >>>> these > > >>>>>>>>> stream partitions. When reading data from Kafka, these stream > > >>>>>>> partitions > > >>>>>>>>> map to the partitions of the Kafka input topic(s), but these > may > > >>>>> change > > >>>>>>>>> once you add processing operations. > > >>>>>>>>> > > >>>>>>>>> To your question: The first step, if the data isn't already > > >> keyed > > >>>> as > > >>>>>>>>> needed, is to select the key you want to count by, which > results > > >>> in > > >>>> 1+ > > >>>>>>>>> output stream partitions. Here, data may get shuffled across > > >> the > > >>>>>>> network > > >>>>>>>>> (but if won't if there's no need to, e.g. when the data is > > >> already > > >>>>>>> keyed as > > >>>>>>>>> needed). Then the count operation is performed for each stream > > >>>>>>> partition, > > >>>>>>>>> which is similar to the sort-and-reduce phase in Hadoop. > > >>>>>>>>> > > >>>>>>>>> On Mon, Aug 29, 2016 at 5:31 PM, Tommy <deeplam...@gmail.com> > > >>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Hi, > > >>>>>>>>>> > > >>>>>>>>>> For "word count" example in Hadoop, there are > > >>>> shuffle-sort-and-reduce > > >>>>>>>>>> phases that handles outputs from different mappers, how does > it > > >>>> work > > >>>>> in > > >>>>>>>>>> KStream ? > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > > > > > > > > > > >