Hi @Matthias & @Michael I pushed the WordCountDemo code to GitHub
https://github.com/deeplambda/kstream-debug, can you help to debug the demo
?

Thanks,
Tommy

On Wed, Aug 31, 2016 at 11:47 AM, Tommy Q <deeplam...@gmail.com> wrote:

> Tried the word count example as discussed, the result in wc-out is wrong:
>
> a 1
>> b 1
>> a 1
>> b 1
>> c 1
>
>
> The expected result should be:
>
> a 2
>> b 2
>> c 1
>
>
> Kafka version is 0.10.0.1
>
>
> On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> No. It does not support hidden topics.
>>
>> The only explanation might be, that there is no repartitioning step. But
>> than the question would be, if there is a bug in Kafka Streams, because
>> between map() and countByKey() repartitioning is required.
>>
>> Can you verify that the result is correct?
>>
>> -Matthias
>>
>> On 08/30/2016 03:24 PM, Tommy Q wrote:
>> > Does Kafka support hidden topics ? (Since all the topics infos are
>> stored
>> > in ZK, this probably not the case )
>> >
>> > On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <matth...@confluent.io
>> >
>> > wrote:
>> >
>> >> Hi Tommy,
>> >>
>> >> yes, you do understand Kafka Streams correctly. And yes, for shuffling,
>> >> na internal topic will be created under the hood. It should be named
>> >> "<application-id>-something-repartition". I am not sure, why it is not
>> >> listed via bin/kafka-topics.sh
>> >>
>> >> The internal topic "<application-id>-counts-changelog" you see is
>> >> created to back the state of countByKey() operator.
>> >>
>> >> See
>> >> https://cwiki.apache.org/confluence/display/KAFKA/
>> >> Kafka+Streams%3A+Internal+Data+Management
>> >>
>> >> and
>> >>
>> >> http://www.confluent.io/blog/data-reprocessing-with-kafka-
>> >> streams-resetting-a-streams-application
>> >>
>> >>
>> >> -Matthias
>> >>
>> >>
>> >> On 08/30/2016 06:55 AM, Tommy Q wrote:
>> >>> Michael, Thanks for your help.
>> >>>
>> >>> Take the word count example, I am trying to walk through the code
>> based
>> >> on
>> >>> your explanation:
>> >>>
>> >>>     val textLines: KStream[String, String] =
>> >> builder.stream("input-topic")
>> >>>     val wordCounts: KStream[String, JLong] = textLines
>> >>>       .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
>> >>>       .map((key: String, word: String) => new KeyValue(word, word))
>> >>>       .countByKey("counts")
>> >>>       .toStream
>> >>>
>> >>>     wordCounts.to(stringSerde, longSerde, "wc-out")
>> >>>
>> >>> Suppose the input-topic has two partitions and each partition has a
>> >> string
>> >>> record produced into:
>> >>>
>> >>> input-topic_0 : "a b"
>> >>>> input-topic_1 : "a b c"
>> >>>
>> >>>
>> >>> Suppose we started two instance of the stream topology ( task_0 and
>> >>> task_1). So after flatMapValues & map executed, they should have the
>> >>> following task state:
>> >>>
>> >>> task_0 :  [ (a, "a"), (b, "b") ]
>> >>>> task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
>> >>>
>> >>>
>> >>> Before the execution of  countByKey, the kafka-stream framework should
>> >>> insert a invisible shuffle phase internally:
>> >>>
>> >>> shuffled across the network :
>> >>>>
>> >>>
>> >>>
>> >>>> _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
>> >>>> _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
>> >>>
>> >>>
>> >>> countByKey (reduce) :
>> >>>
>> >>> task_0 (counts-changelog_0) :  [ (a, 2) ]
>> >>>
>> >>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
>> >>>
>> >>>
>> >>> And after the execution of `wordCounts.to(stringSerde, longSerde,
>> >>> "wc-out")`, we get the word count output in wc-out topic:
>> >>>
>> >>> task_0 (wc-out_0) :  [ (a, 2) ]
>> >>>
>> >>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
>> >>>
>> >>>
>> >>>
>> >>> According the steps list above, do I understand the internals of
>> kstream
>> >>> word count correctly ?
>> >>> Another question is does the shuffle across the network work by
>> creating
>> >>> intermediate topics ? If so, why can't I find the intermediate topics
>> >> using
>> >>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can only
>> see
>> >>> the counts-changelog got created by the kstream framework.
>> >>>
>> >>>
>> >>>
>> >>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll <mich...@confluent.io>
>> >> wrote:
>> >>>
>> >>>> In Kafka Streams, data is partitioned according to the keys of the
>> >>>> key-value records, and operations such as countByKey operate on these
>> >>>> stream partitions.  When reading data from Kafka, these stream
>> >> partitions
>> >>>> map to the partitions of the Kafka input topic(s), but these may
>> change
>> >>>> once you add processing operations.
>> >>>>
>> >>>> To your question:  The first step, if the data isn't already keyed as
>> >>>> needed, is to select the key you want to count by, which results in
>> 1+
>> >>>> output stream partitions.  Here, data may get shuffled across the
>> >> network
>> >>>> (but if won't if there's no need to, e.g. when the data is already
>> >> keyed as
>> >>>> needed).  Then the count operation is performed for each stream
>> >> partition,
>> >>>> which is similar to the sort-and-reduce phase in Hadoop.
>> >>>>
>> >>>> On Mon, Aug 29, 2016 at 5:31 PM, Tommy <deeplam...@gmail.com> wrote:
>> >>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> For "word count" example in Hadoop, there are
>> shuffle-sort-and-reduce
>> >>>>> phases that handles outputs from different mappers, how does it
>> work in
>> >>>>> KStream ?
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >>
>> >
>>
>>
>

Reply via email to