Hello Tommy,

Which version of Kafka are you using?

Guozhang

On Wed, Aug 31, 2016 at 4:41 AM, Tommy Q <deeplam...@gmail.com> wrote:

> I cleaned up all the zookeeper & kafka states and run the WordCountDemo
> again, the results in wc-out is still wrong:
>
> a 1
> > b 1
> > a 1
> > b 1
> > c 1
>
>
>
> On Wed, Aug 31, 2016 at 5:32 PM, Michael Noll <mich...@confluent.io>
> wrote:
>
> > Can you double-check whether the results in wc-out are not rather:
> >
> > a 1
> > b 1
> > a 2
> > b 2
> > c 1
> >
> > ?
> >
> > On Wed, Aug 31, 2016 at 5:47 AM, Tommy Q <deeplam...@gmail.com> wrote:
> >
> > > Tried the word count example as discussed, the result in wc-out is
> wrong:
> > >
> > > a 1
> > > > b 1
> > > > a 1
> > > > b 1
> > > > c 1
> > >
> > >
> > > The expected result should be:
> > >
> > > a 2
> > > > b 2
> > > > c 1
> > >
> > >
> > > Kafka version is 0.10.0.1
> > >
> > >
> > > On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > > wrote:
> > >
> > > > No. It does not support hidden topics.
> > > >
> > > > The only explanation might be, that there is no repartitioning step.
> > But
> > > > than the question would be, if there is a bug in Kafka Streams,
> because
> > > > between map() and countByKey() repartitioning is required.
> > > >
> > > > Can you verify that the result is correct?
> > > >
> > > > -Matthias
> > > >
> > > > On 08/30/2016 03:24 PM, Tommy Q wrote:
> > > > > Does Kafka support hidden topics ? (Since all the topics infos are
> > > stored
> > > > > in ZK, this probably not the case )
> > > > >
> > > > > On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <
> > > matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > >> Hi Tommy,
> > > > >>
> > > > >> yes, you do understand Kafka Streams correctly. And yes, for
> > > shuffling,
> > > > >> na internal topic will be created under the hood. It should be
> named
> > > > >> "<application-id>-something-repartition". I am not sure, why it
> is
> > > not
> > > > >> listed via bin/kafka-topics.sh
> > > > >>
> > > > >> The internal topic "<application-id>-counts-changelog" you see is
> > > > >> created to back the state of countByKey() operator.
> > > > >>
> > > > >> See
> > > > >> https://cwiki.apache.org/confluence/display/KAFKA/
> > > > >> Kafka+Streams%3A+Internal+Data+Management
> > > > >>
> > > > >> and
> > > > >>
> > > > >> http://www.confluent.io/blog/data-reprocessing-with-kafka-
> > > > >> streams-resetting-a-streams-application
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >>
> > > > >> On 08/30/2016 06:55 AM, Tommy Q wrote:
> > > > >>> Michael, Thanks for your help.
> > > > >>>
> > > > >>> Take the word count example, I am trying to walk through the code
> > > based
> > > > >> on
> > > > >>> your explanation:
> > > > >>>
> > > > >>>     val textLines: KStream[String, String] =
> > > > >> builder.stream("input-topic")
> > > > >>>     val wordCounts: KStream[String, JLong] = textLines
> > > > >>>       .flatMapValues(_.toLowerCase.split("\\W+").toIterable.
> > asJava)
> > > > >>>       .map((key: String, word: String) => new KeyValue(word,
> word))
> > > > >>>       .countByKey("counts")
> > > > >>>       .toStream
> > > > >>>
> > > > >>>     wordCounts.to(stringSerde, longSerde, "wc-out")
> > > > >>>
> > > > >>> Suppose the input-topic has two partitions and each partition
> has a
> > > > >> string
> > > > >>> record produced into:
> > > > >>>
> > > > >>> input-topic_0 : "a b"
> > > > >>>> input-topic_1 : "a b c"
> > > > >>>
> > > > >>>
> > > > >>> Suppose we started two instance of the stream topology ( task_0
> and
> > > > >>> task_1). So after flatMapValues & map executed, they should have
> > the
> > > > >>> following task state:
> > > > >>>
> > > > >>> task_0 :  [ (a, "a"), (b, "b") ]
> > > > >>>> task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
> > > > >>>
> > > > >>>
> > > > >>> Before the execution of  countByKey, the kafka-stream framework
> > > should
> > > > >>> insert a invisible shuffle phase internally:
> > > > >>>
> > > > >>> shuffled across the network :
> > > > >>>>
> > > > >>>
> > > > >>>
> > > > >>>> _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
> > > > >>>> _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
> > > > >>>
> > > > >>>
> > > > >>> countByKey (reduce) :
> > > > >>>
> > > > >>> task_0 (counts-changelog_0) :  [ (a, 2) ]
> > > > >>>
> > > > >>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
> > > > >>>
> > > > >>>
> > > > >>> And after the execution of `wordCounts.to(stringSerde, longSerde,
> > > > >>> "wc-out")`, we get the word count output in wc-out topic:
> > > > >>>
> > > > >>> task_0 (wc-out_0) :  [ (a, 2) ]
> > > > >>>
> > > > >>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> According the steps list above, do I understand the internals of
> > > > kstream
> > > > >>> word count correctly ?
> > > > >>> Another question is does the shuffle across the network work by
> > > > creating
> > > > >>> intermediate topics ? If so, why can't I find the intermediate
> > topics
> > > > >> using
> > > > >>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can
> > only
> > > > see
> > > > >>> the counts-changelog got created by the kstream framework.
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll <
> > mich...@confluent.io>
> > > > >> wrote:
> > > > >>>
> > > > >>>> In Kafka Streams, data is partitioned according to the keys of
> the
> > > > >>>> key-value records, and operations such as countByKey operate on
> > > these
> > > > >>>> stream partitions.  When reading data from Kafka, these stream
> > > > >> partitions
> > > > >>>> map to the partitions of the Kafka input topic(s), but these may
> > > > change
> > > > >>>> once you add processing operations.
> > > > >>>>
> > > > >>>> To your question:  The first step, if the data isn't already
> keyed
> > > as
> > > > >>>> needed, is to select the key you want to count by, which results
> > in
> > > 1+
> > > > >>>> output stream partitions.  Here, data may get shuffled across
> the
> > > > >> network
> > > > >>>> (but if won't if there's no need to, e.g. when the data is
> already
> > > > >> keyed as
> > > > >>>> needed).  Then the count operation is performed for each stream
> > > > >> partition,
> > > > >>>> which is similar to the sort-and-reduce phase in Hadoop.
> > > > >>>>
> > > > >>>> On Mon, Aug 29, 2016 at 5:31 PM, Tommy <deeplam...@gmail.com>
> > > wrote:
> > > > >>>>
> > > > >>>>> Hi,
> > > > >>>>>
> > > > >>>>> For "word count" example in Hadoop, there are
> > > shuffle-sort-and-reduce
> > > > >>>>> phases that handles outputs from different mappers, how does it
> > > work
> > > > in
> > > > >>>>> KStream ?
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to