Tried the word count example as discussed, the result in wc-out is wrong:

a 1
> b 1
> a 1
> b 1
> c 1


The expected result should be:

a 2
> b 2
> c 1


Kafka version is 0.10.0.1


On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> No. It does not support hidden topics.
>
> The only explanation might be, that there is no repartitioning step. But
> than the question would be, if there is a bug in Kafka Streams, because
> between map() and countByKey() repartitioning is required.
>
> Can you verify that the result is correct?
>
> -Matthias
>
> On 08/30/2016 03:24 PM, Tommy Q wrote:
> > Does Kafka support hidden topics ? (Since all the topics infos are stored
> > in ZK, this probably not the case )
> >
> > On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Hi Tommy,
> >>
> >> yes, you do understand Kafka Streams correctly. And yes, for shuffling,
> >> na internal topic will be created under the hood. It should be named
> >> "<application-id>-something-repartition". I am not sure, why it is not
> >> listed via bin/kafka-topics.sh
> >>
> >> The internal topic "<application-id>-counts-changelog" you see is
> >> created to back the state of countByKey() operator.
> >>
> >> See
> >> https://cwiki.apache.org/confluence/display/KAFKA/
> >> Kafka+Streams%3A+Internal+Data+Management
> >>
> >> and
> >>
> >> http://www.confluent.io/blog/data-reprocessing-with-kafka-
> >> streams-resetting-a-streams-application
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 08/30/2016 06:55 AM, Tommy Q wrote:
> >>> Michael, Thanks for your help.
> >>>
> >>> Take the word count example, I am trying to walk through the code based
> >> on
> >>> your explanation:
> >>>
> >>>     val textLines: KStream[String, String] =
> >> builder.stream("input-topic")
> >>>     val wordCounts: KStream[String, JLong] = textLines
> >>>       .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
> >>>       .map((key: String, word: String) => new KeyValue(word, word))
> >>>       .countByKey("counts")
> >>>       .toStream
> >>>
> >>>     wordCounts.to(stringSerde, longSerde, "wc-out")
> >>>
> >>> Suppose the input-topic has two partitions and each partition has a
> >> string
> >>> record produced into:
> >>>
> >>> input-topic_0 : "a b"
> >>>> input-topic_1 : "a b c"
> >>>
> >>>
> >>> Suppose we started two instance of the stream topology ( task_0 and
> >>> task_1). So after flatMapValues & map executed, they should have the
> >>> following task state:
> >>>
> >>> task_0 :  [ (a, "a"), (b, "b") ]
> >>>> task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
> >>>
> >>>
> >>> Before the execution of  countByKey, the kafka-stream framework should
> >>> insert a invisible shuffle phase internally:
> >>>
> >>> shuffled across the network :
> >>>>
> >>>
> >>>
> >>>> _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
> >>>> _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
> >>>
> >>>
> >>> countByKey (reduce) :
> >>>
> >>> task_0 (counts-changelog_0) :  [ (a, 2) ]
> >>>
> >>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
> >>>
> >>>
> >>> And after the execution of `wordCounts.to(stringSerde, longSerde,
> >>> "wc-out")`, we get the word count output in wc-out topic:
> >>>
> >>> task_0 (wc-out_0) :  [ (a, 2) ]
> >>>
> >>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
> >>>
> >>>
> >>>
> >>> According the steps list above, do I understand the internals of
> kstream
> >>> word count correctly ?
> >>> Another question is does the shuffle across the network work by
> creating
> >>> intermediate topics ? If so, why can't I find the intermediate topics
> >> using
> >>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can only
> see
> >>> the counts-changelog got created by the kstream framework.
> >>>
> >>>
> >>>
> >>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll <mich...@confluent.io>
> >> wrote:
> >>>
> >>>> In Kafka Streams, data is partitioned according to the keys of the
> >>>> key-value records, and operations such as countByKey operate on these
> >>>> stream partitions.  When reading data from Kafka, these stream
> >> partitions
> >>>> map to the partitions of the Kafka input topic(s), but these may
> change
> >>>> once you add processing operations.
> >>>>
> >>>> To your question:  The first step, if the data isn't already keyed as
> >>>> needed, is to select the key you want to count by, which results in 1+
> >>>> output stream partitions.  Here, data may get shuffled across the
> >> network
> >>>> (but if won't if there's no need to, e.g. when the data is already
> >> keyed as
> >>>> needed).  Then the count operation is performed for each stream
> >> partition,
> >>>> which is similar to the sort-and-reduce phase in Hadoop.
> >>>>
> >>>> On Mon, Aug 29, 2016 at 5:31 PM, Tommy <deeplam...@gmail.com> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> For "word count" example in Hadoop, there are shuffle-sort-and-reduce
> >>>>> phases that handles outputs from different mappers, how does it work
> in
> >>>>> KStream ?
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to