Hello - I am a beginner in Kafka .. with my first Kafka streams application ..
I have a streams application that reads from a topic, does some transformation on the data and writes to another topic. The record that I manipulate is a CSV record. It runs fine when I run it on a local Kafka instance. However when I run it on an AWS cluster, I get the following exception when I try to produce the transformed record into the target topic. Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1 at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72) at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$KStreamPassThroughProcessor.process(KStreamPassThrough.java:34) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:351) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:192) at org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:46) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) Looks like the timestamp passed to the ProducerRecord is -1, though I am not passing any timestamp explicitly. I am not sure why this happens. But I see that the Javadoc for ProducerRecord says the following .. The record also has an associated timestamp. If the user did not provide a > timestamp, the producer will stamp the record with its current time. The > timestamp eventually used by Kafka depends on the timestamp type configured > for the topic. > If the topic is configured to use CreateTime, the timestamp in the > producer record will be used by the broker. > If the topic is configured to use LogAppendTime, the timestamp in the > producer record will be overwritten by the broker with the broker local > time when it appends the message to its log. > In either of the cases above, the timestamp that has actually been used > will be returned to user in RecordMetadata 1. Will this problem be solved if I configure the topic with LogAppendTime or CreateTime explicitly ? 2. What is the default setting of this property in a newly created topic ? 3. How do I change it (what is the name of the property to be set) ? 4. Any idea why I face this problem in the cluster mode but not in the local mode ? BTW I am using 0.10.1. Any help / pointer will be appreciated ? regards. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg