I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in the last mail. And I am using Kafka within a DC/OS cluster under AWS.
The version that I mentioned works ok is on my local machine using a local Kafka installation. And it works for both single broker and multi broker scenario. Thanks. On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Hello - > > I am a beginner in Kafka .. with my first Kafka streams application .. > > I have a streams application that reads from a topic, does some > transformation on the data and writes to another topic. The record that I > manipulate is a CSV record. > > It runs fine when I run it on a local Kafka instance. > > However when I run it on an AWS cluster, I get the following exception > when I try to produce the transformed record into the target topic. > > Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: > Invalid timestamp -1 > at org.apache.kafka.clients.producer.ProducerRecord.<init> > (ProducerRecord.java:60) > at org.apache.kafka.streams.processor.internals.SinkNode. > process(SinkNode.java:72) > at org.apache.kafka.streams.processor.internals. > StreamTask.forward(StreamTask.java:338) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > at org.apache.kafka.streams.kstream.internals.KStreamMapValues$ > KStreamMapProcessor.process(KStreamMapValues.java:42) > at org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:68) > at org.apache.kafka.streams.processor.internals. > StreamTask.forward(StreamTask.java:338) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$ > KStreamPassThroughProcessor.process(KStreamPassThrough.java:34) > at org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:68) > at org.apache.kafka.streams.processor.internals. > StreamTask.forward(StreamTask.java:351) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:192) > at org.apache.kafka.streams.kstream.internals.KStreamBranch$ > KStreamBranchProcessor.process(KStreamBranch.java:46) > at org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:68) > at org.apache.kafka.streams.processor.internals. > StreamTask.forward(StreamTask.java:338) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > at org.apache.kafka.streams.kstream.internals.KStreamMapValues$ > KStreamMapProcessor.process(KStreamMapValues.java:42) > at org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:68) > at org.apache.kafka.streams.processor.internals. > StreamTask.forward(StreamTask.java:338) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > at org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:64) > at org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:174) > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:320) > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:218) > > Looks like the timestamp passed to the ProducerRecord is -1, though I am > not passing any timestamp explicitly. I am not sure why this happens. But I > see that the Javadoc for ProducerRecord says the following .. > > The record also has an associated timestamp. If the user did not provide a >> timestamp, the producer will stamp the record with its current time. The >> timestamp eventually used by Kafka depends on the timestamp type configured >> for the topic. >> If the topic is configured to use CreateTime, the timestamp in the >> producer record will be used by the broker. >> If the topic is configured to use LogAppendTime, the timestamp in the >> producer record will be overwritten by the broker with the broker local >> time when it appends the message to its log. >> In either of the cases above, the timestamp that has actually been used >> will be returned to user in RecordMetadata > > > 1. Will this problem be solved if I configure the topic with > LogAppendTime or CreateTime explicitly ? > 2. What is the default setting of this property in a newly created > topic ? > 3. How do I change it (what is the name of the property to be set) ? > 4. Any idea why I face this problem in the cluster mode but not in the > local mode ? > > BTW I am using 0.10.1. > > Any help / pointer will be appreciated ? > > regards. > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg