Thanks for the info. With "page2",{"user":"2", "page":"22", "timestamp":1435278177777} as input for streams-pageview-input an "2",{"region":"CA","timestamp":1435278177777} as input for streams-userprofile-input, the following error is shown, Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: JsonTimestampExtractor cannot recognize the record value org.apache.kafka.streams.examples.pageview.PageViewTypedDemo$PageViewByRegion@4764b2e at org.apache.kafka.streams.examples.pageview.JsonTimestampExtractor.extract(JsonTimestampExtractor.java:43) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:105) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:144) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:415) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Any example on the correct input value is really appreciated. Thanks On Wed, Mar 22, 2017 at 12:27 PM, Michael Noll <mich...@confluent.io> wrote: > IIRC the PageViewTypedDemo example requires input data where the > username/userId is captured in the keys of messages/records, and further > information in the values of those messages. > > The problem you are running into is that, when you are writing your input > data via the console consumer, the records you are generating only have > values -- the keys are null because you don't specify any explicitly. > > > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > streams-userprofile-input > > > > {"region":"CA", "timestamp":1435278171139} > > And you have the same issue for the other topic, "streams-pageview-input". > > To enter keys, you need to add some CLI options to the console producer. > > Example: > > $ bin/kafka-console-producer --broker-list localhost:9092 \ > --topic streams-userprofile-input \ > --property parse.key=true \ > --property key.separator=, > > firstUser,firstValue > secondUser,secondValue > > Hope this helps, > Michael > > > > > On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <shan2n...@gmail.com> > wrote: > > > I have started exploring kafka streaming API. I'm trying to run > > PageViewTypedDemo program as it is without any changes locally on a > > desktop. Current kafka version is 0.10.1.0. > > > > With the following inputs from 2 different console, > > > > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > > streams-pageview-input > > > > {"user":"1", "page":"22", "timestamp":1435278171111} > > > > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > > streams-userprofile-input > > > > {"region":"CA", "timestamp":1435278171139} > > > > The error is > > > > Exception in thread "StreamThread-1" > > org.apache.kafka.streams.errors.StreamsException: Exception caught in > > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, > > topic=streams-userprofile-input, partition=0, offset=0 > > > > at > > org.apache.kafka.streams.processor.internals. > > StreamTask.process(StreamTask.java:200) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:436) > > > > at > > org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:242) > > > > Caused by: org.apache.kafka.streams.errors.StreamsException: Record key > > for > > the source KTable from store name streams-userprofile-store-name should > not > > be null. > > > > at > > org.apache.kafka.streams.kstream.internals.KTableSource$ > > MaterializedKTableSourceProcessor.process(KTableSource.java:83) > > > > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > > at > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward( > > ProcessorContextImpl.java:204) > > > > at > > org.apache.kafka.streams.processor.internals. > > SourceNode.process(SourceNode.java:66) > > > > at > > org.apache.kafka.streams.processor.internals. > > StreamTask.process(StreamTask.java:181) > > > > ... 2 more > > > > > > Can someone help .Is there anything else to be done apart from creating > the > > 2 topics streams-pageview-input & streams-userprofile-input > > >