IIRC the PageViewTypedDemo example requires input data where the
username/userId is captured in the keys of messages/records, and further
information in the values of those messages.

The problem you are running into is that, when you are writing your input
data via the console consumer, the records you are generating only have
values -- the keys are null because you don't specify any explicitly.

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
streams-userprofile-input
>
> {"region":"CA", "timestamp":1435278171139}

And you have the same issue for the other topic, "streams-pageview-input".

To enter keys, you need to add some CLI options to the console producer.

Example:

    $ bin/kafka-console-producer --broker-list localhost:9092 \
                             --topic streams-userprofile-input \
                             --property parse.key=true \
                             --property key.separator=,

    firstUser,firstValue
    secondUser,secondValue

Hope this helps,
Michael




On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <shan2n...@gmail.com>
wrote:

> I have started exploring kafka streaming API. I'm trying to  run
> PageViewTypedDemo program as it is without any changes locally on a
> desktop. Current kafka version is 0.10.1.0.
>
> With the following inputs from 2 different console,
>
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> streams-pageview-input
>
> {"user":"1", "page":"22", "timestamp":1435278171111}
>
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> streams-userprofile-input
>
> {"region":"CA", "timestamp":1435278171139}
>
> The error is
>
> Exception in thread "StreamThread-1"
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001,
> topic=streams-userprofile-input, partition=0, offset=0
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:200)
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:436)
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
>
> Caused by: org.apache.kafka.streams.errors.StreamsException: Record key
> for
> the source KTable from store name streams-userprofile-store-name should not
> be null.
>
> at
> org.apache.kafka.streams.kstream.internals.KTableSource$
> MaterializedKTableSourceProcessor.process(KTableSource.java:83)
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:204)
>
> at
> org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66)
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181)
>
> ... 2 more
>
>
> Can someone help .Is there anything else to be done apart from creating the
> 2 topics streams-pageview-input & streams-userprofile-input
>

Reply via email to