IIRC the PageViewTypedDemo example requires input data where the username/userId is captured in the keys of messages/records, and further information in the values of those messages.
The problem you are running into is that, when you are writing your input data via the console consumer, the records you are generating only have values -- the keys are null because you don't specify any explicitly. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-userprofile-input > > {"region":"CA", "timestamp":1435278171139} And you have the same issue for the other topic, "streams-pageview-input". To enter keys, you need to add some CLI options to the console producer. Example: $ bin/kafka-console-producer --broker-list localhost:9092 \ --topic streams-userprofile-input \ --property parse.key=true \ --property key.separator=, firstUser,firstValue secondUser,secondValue Hope this helps, Michael On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <shan2n...@gmail.com> wrote: > I have started exploring kafka streaming API. I'm trying to run > PageViewTypedDemo program as it is without any changes locally on a > desktop. Current kafka version is 0.10.1.0. > > With the following inputs from 2 different console, > > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > streams-pageview-input > > {"user":"1", "page":"22", "timestamp":1435278171111} > > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > streams-userprofile-input > > {"region":"CA", "timestamp":1435278171139} > > The error is > > Exception in thread "StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, > topic=streams-userprofile-input, partition=0, offset=0 > > at > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:200) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:436) > > at > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > > Caused by: org.apache.kafka.streams.errors.StreamsException: Record key > for > the source KTable from store name streams-userprofile-store-name should not > be null. > > at > org.apache.kafka.streams.kstream.internals.KTableSource$ > MaterializedKTableSourceProcessor.process(KTableSource.java:83) > > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:204) > > at > org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:66) > > at > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:181) > > ... 2 more > > > Can someone help .Is there anything else to be done apart from creating the > 2 topics streams-pageview-input & streams-userprofile-input >