Michael, Thanks for your help.

Take the word count example, I am trying to walk through the code based on
your explanation:

    val textLines: KStream[String, String] = builder.stream("input-topic")
    val wordCounts: KStream[String, JLong] = textLines
      .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
      .map((key: String, word: String) => new KeyValue(word, word))
      .countByKey("counts")
      .toStream

    wordCounts.to(stringSerde, longSerde, "wc-out")

Suppose the input-topic has two partitions and each partition has a string
record produced into:

input-topic_0 : "a b"
> input-topic_1 : "a b c"


Suppose we started two instance of the stream topology ( task_0 and
task_1). So after flatMapValues & map executed, they should have the
following task state:

task_0 :  [ (a, "a"), (b, "b") ]
> task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]


Before the execution of  countByKey, the kafka-stream framework should
insert a invisible shuffle phase internally:

shuffled across the network :
>


> _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
> _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]


countByKey (reduce) :

task_0 (counts-changelog_0) :  [ (a, 2) ]

task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]


And after the execution of `wordCounts.to(stringSerde, longSerde,
"wc-out")`, we get the word count output in wc-out topic:

task_0 (wc-out_0) :  [ (a, 2) ]

task_1 (wc-out_1):   [ (b, 2), (c, 1) ]



According the steps list above, do I understand the internals of kstream
word count correctly ?
Another question is does the shuffle across the network work by creating
intermediate topics ? If so, why can't I find the intermediate topics using
`bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can only see
the counts-changelog got created by the kstream framework.



On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll <mich...@confluent.io> wrote:

> In Kafka Streams, data is partitioned according to the keys of the
> key-value records, and operations such as countByKey operate on these
> stream partitions.  When reading data from Kafka, these stream partitions
> map to the partitions of the Kafka input topic(s), but these may change
> once you add processing operations.
>
> To your question:  The first step, if the data isn't already keyed as
> needed, is to select the key you want to count by, which results in 1+
> output stream partitions.  Here, data may get shuffled across the network
> (but if won't if there's no need to, e.g. when the data is already keyed as
> needed).  Then the count operation is performed for each stream partition,
> which is similar to the sort-and-reduce phase in Hadoop.
>
> On Mon, Aug 29, 2016 at 5:31 PM, Tommy <deeplam...@gmail.com> wrote:
>
> > Hi,
> >
> > For "word count" example in Hadoop, there are shuffle-sort-and-reduce
> > phases that handles outputs from different mappers, how does it work in
> > KStream ?
> >
>

Reply via email to