Thanks for the info.
With "page2",{"user":"2", "page":"22", "timestamp":1435278177777} as input
for streams-pageview-input   an
"2",{"region":"CA","timestamp":1435278177777} as input for
 streams-userprofile-input, the following error is shown,
Exception in thread "StreamThread-1" java.lang.IllegalArgumentException:
JsonTimestampExtractor cannot recognize the record value
org.apache.kafka.streams.examples.pageview.PageViewTypedDemo$PageViewByRegion@4764b2e
at
org.apache.kafka.streams.examples.pageview.JsonTimestampExtractor.extract(JsonTimestampExtractor.java:43)
at
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:105)
at
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:144)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:415)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

Any example on the correct input value is really appreciated.

Thanks

On Wed, Mar 22, 2017 at 12:27 PM, Michael Noll <mich...@confluent.io> wrote:

> IIRC the PageViewTypedDemo example requires input data where the
> username/userId is captured in the keys of messages/records, and further
> information in the values of those messages.
>
> The problem you are running into is that, when you are writing your input
> data via the console consumer, the records you are generating only have
> values -- the keys are null because you don't specify any explicitly.
>
> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> streams-userprofile-input
> >
> > {"region":"CA", "timestamp":1435278171139}
>
> And you have the same issue for the other topic, "streams-pageview-input".
>
> To enter keys, you need to add some CLI options to the console producer.
>
> Example:
>
>     $ bin/kafka-console-producer --broker-list localhost:9092 \
>                              --topic streams-userprofile-input \
>                              --property parse.key=true \
>                              --property key.separator=,
>
>     firstUser,firstValue
>     secondUser,secondValue
>
> Hope this helps,
> Michael
>
>
>
>
> On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <shan2n...@gmail.com>
> wrote:
>
> > I have started exploring kafka streaming API. I'm trying to  run
> > PageViewTypedDemo program as it is without any changes locally on a
> > desktop. Current kafka version is 0.10.1.0.
> >
> > With the following inputs from 2 different console,
> >
> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> > streams-pageview-input
> >
> > {"user":"1", "page":"22", "timestamp":1435278171111}
> >
> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> > streams-userprofile-input
> >
> > {"region":"CA", "timestamp":1435278171139}
> >
> > The error is
> >
> > Exception in thread "StreamThread-1"
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001,
> > topic=streams-userprofile-input, partition=0, offset=0
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:200)
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:436)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> >
> > Caused by: org.apache.kafka.streams.errors.StreamsException: Record key
> > for
> > the source KTable from store name streams-userprofile-store-name should
> not
> > be null.
> >
> > at
> > org.apache.kafka.streams.kstream.internals.KTableSource$
> > MaterializedKTableSourceProcessor.process(KTableSource.java:83)
> >
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:204)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:66)
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:181)
> >
> > ... 2 more
> >
> >
> > Can someone help .Is there anything else to be done apart from creating
> the
> > 2 topics streams-pageview-input & streams-userprofile-input
> >
>

Reply via email to