Hello -

I am a beginner in Kafka .. with my first Kafka streams application ..

I have a streams application that reads from a topic, does some
transformation on the data and writes to another topic. The record that I
manipulate is a CSV record.

It runs fine when I run it on a local Kafka instance.

However when I run it on an AWS cluster, I get the following exception when
I try to produce the transformed record into the target topic.

Exception in thread "StreamThread-1" java.lang.IllegalArgumentException:
Invalid timestamp -1
at
org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at
org.apache.kafka.streams.kstream.internals.KStreamPassThrough$KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:351)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
at
org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:46)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

Looks like the timestamp passed to the ProducerRecord is -1, though I am
not passing any timestamp explicitly. I am not sure why this happens. But I
see that the Javadoc for ProducerRecord says the following ..

The record also has an associated timestamp. If the user did not provide a
> timestamp, the producer will stamp the record with its current time. The
> timestamp eventually used by Kafka depends on the timestamp type configured
> for the topic.
> If the topic is configured to use CreateTime, the timestamp in the
> producer record will be used by the broker.
> If the topic is configured to use LogAppendTime, the timestamp in the
> producer record will be overwritten by the broker with the broker local
> time when it appends the message to its log.
> In either of the cases above, the timestamp that has actually been used
> will be returned to user in RecordMetadata


   1. Will this problem be solved if I configure the topic with
   LogAppendTime or CreateTime explicitly ?
   2. What is the default setting of this property in a newly created topic
   ?
   3. How do I change it (what is the name of the property to be set) ?
   4. Any idea why I face this problem in the cluster mode but not in the
   local mode ?

BTW I am using 0.10.1.

Any help / pointer will be appreciated ?

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to