I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in the last mail.
And I am using Kafka within a DC/OS cluster under AWS.

The version that I mentioned works ok is on my local machine using a local
Kafka installation. And it works for both single broker and multi broker
scenario.

Thanks.

On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Hello -
>
> I am a beginner in Kafka .. with my first Kafka streams application ..
>
> I have a streams application that reads from a topic, does some
> transformation on the data and writes to another topic. The record that I
> manipulate is a CSV record.
>
> It runs fine when I run it on a local Kafka instance.
>
> However when I run it on an AWS cluster, I get the following exception
> when I try to produce the transformed record into the target topic.
>
> Exception in thread "StreamThread-1" java.lang.IllegalArgumentException:
> Invalid timestamp -1
> at org.apache.kafka.clients.producer.ProducerRecord.<init>
> (ProducerRecord.java:60)
> at org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:72)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:338)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> KStreamMapProcessor.process(KStreamMapValues.java:42)
> at org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:68)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:338)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
> at org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:68)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:351)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
> at org.apache.kafka.streams.kstream.internals.KStreamBranch$
> KStreamBranchProcessor.process(KStreamBranch.java:46)
> at org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:68)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:338)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> KStreamMapProcessor.process(KStreamMapValues.java:42)
> at org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:68)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:338)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:64)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:174)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:320)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:218)
>
> Looks like the timestamp passed to the ProducerRecord is -1, though I am
> not passing any timestamp explicitly. I am not sure why this happens. But I
> see that the Javadoc for ProducerRecord says the following ..
>
> The record also has an associated timestamp. If the user did not provide a
>> timestamp, the producer will stamp the record with its current time. The
>> timestamp eventually used by Kafka depends on the timestamp type configured
>> for the topic.
>> If the topic is configured to use CreateTime, the timestamp in the
>> producer record will be used by the broker.
>> If the topic is configured to use LogAppendTime, the timestamp in the
>> producer record will be overwritten by the broker with the broker local
>> time when it appends the message to its log.
>> In either of the cases above, the timestamp that has actually been used
>> will be returned to user in RecordMetadata
>
>
>    1. Will this problem be solved if I configure the topic with
>    LogAppendTime or CreateTime explicitly ?
>    2. What is the default setting of this property in a newly created
>    topic ?
>    3. How do I change it (what is the name of the property to be set) ?
>    4. Any idea why I face this problem in the cluster mode but not in the
>    local mode ?
>
> BTW I am using 0.10.1.
>
> Any help / pointer will be appreciated ?
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to