Hello Mathias - Thanks a lot for the response. I think what may be happening is a version mismatch between the development & deployment versions of Kafka. The Kafka streams application that I developed uses 0.10.0 based libraries. And my local environment contains a server installation of the same version. Hence it works ok in my local environment.
But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the service through DC/OS cli. And I use this version to load records into the input topic. And try to consume using the deployed streams application which I developed using 0.10.0. Hence the producer did not put the timestamp while the consumer expects to have one. I need to check if 0.10.x is available for DC/OS .. Thanks again for your suggestions. On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > Hey, > > we just added a new FAQ entry for upcoming CP 3.2 release that answers > your question. I just c&p it here. More concrete answer below. > > > If you get an exception similar to the one shown below, there are > > multiple possible causes: > > > > Exception in thread "StreamThread-1" > > java.lang.IllegalArgumentException: Invalid timestamp -1 at > > org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord > .java:60) > > > > This error means that the timestamp extractor of your Kafka > > Streams application failed to extract a valid timestamp from a > > record. Typically, this points to a problem with the record (e.g., > > the record does not contain a timestamp at all), but it could also > > indicate a problem or bug in the timestamp extractor used by the > > application. > > > > When does a record not contain a valid timestamp: > > > > If you are using the default ConsumerRecordTimestampExtractor, it > > is most likely that your records do not carry an embedded > > timestamp (embedded record timestamps got introduced in Kafka's > > message format in Kafka 0.10). This might happen, if you consume a > > topic that is written by old Kafka producer clients (ie, version > > 0.9 or earlier) or third party producer clients. A common situation > > where this may happen is after upgrading your Kafka cluster from > > 0.9 to 0.10, where all the data that was generated with 0.9 is not > > compatible with the 0.10 message format. If you are using a custom > > timestamp extractor, make sure that your extractor is robust to > > missing timestamps in your records. For example, you can return a > > default or estimated timestamp if you cannot extract a valid > > timestamp (maybe the timstamp field in your data is just missing). > > You can also switch to processing time semantics via > > WallclockTimestampExtractor; whether such a fallback is an > > appropriate response to this situation depends on your use case. > > However, as a first step you should identify and fix the root cause > > for why such problematic records were written to Kafka in the first > > place. In a second step you may consider applying workarounds (as > > described above) when dealing with such records (for example, if > > you need to process those records after all). Another option is to > > regenerate the records with correct timestamps and write them to a > > new Kafka topic. > > > > When the timestamp extractor causes the problem: > > > > In this situation you should debug and fix the erroneous extractor. > > If the extractor is built into Kafka, please report the bug to the > > Kafka developer mailing list at d...@kafka.apache.org (see > > instructions http://kafka.apache.org/contact); in the meantime, you > > may write a custom timestamp extractor that fixes the problem and > > configure your application to use that extractor for the time > > being. > > > To address you questions more concretely: > > 1. Yes an no: Yes, for any new data you write to you topic. No, for > any already written data that does not have a valid timestamp set > 2. Default is creating time > 3. Config parameter "message.timestamp.type") > It's a broker side per topic setting > (however, be aware that Java KafkaProducer does verify the > timestamp locally before sending the message to the broker, thus on -1 > there will be the client side exception you did observe( > 4. I assume that you do consumer different topic with different TS > fields in you records. > > Also have a look at: > http://docs.confluent.io/current/streams/concepts.html#time > > > > - -Matthias > > > On 10/28/16 5:42 AM, Debasish Ghosh wrote: > > I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in the > > last mail. And I am using Kafka within a DC/OS cluster under AWS. > > > > The version that I mentioned works ok is on my local machine using > > a local Kafka installation. And it works for both single broker and > > multi broker scenario. > > > > Thanks. > > > > On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh > > <ghosh.debas...@gmail.com> wrote: > > > >> Hello - > >> > >> I am a beginner in Kafka .. with my first Kafka streams > >> application .. > >> > >> I have a streams application that reads from a topic, does some > >> transformation on the data and writes to another topic. The > >> record that I manipulate is a CSV record. > >> > >> It runs fine when I run it on a local Kafka instance. > >> > >> However when I run it on an AWS cluster, I get the following > >> exception when I try to produce the transformed record into the > >> target topic. > >> > >> Exception in thread "StreamThread-1" > >> java.lang.IllegalArgumentException: Invalid timestamp -1 at > >> org.apache.kafka.clients.producer.ProducerRecord.<init> > >> (ProducerRecord.java:60) at > >> org.apache.kafka.streams.processor.internals.SinkNode. > >> process(SinkNode.java:72) at > >> org.apache.kafka.streams.processor.internals. > >> StreamTask.forward(StreamTask.java:338) at > >> org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at > >> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ > >> KStreamMapProcessor.process(KStreamMapValues.java:42) at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process( > >> > >> > ProcessorNode.java:68) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.forward(StreamTask.java:338) at > >> org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at > >> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$ > >> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34) > >> at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process( > >> > >> > ProcessorNode.java:68) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.forward(StreamTask.java:351) at > >> org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:192) at > >> org.apache.kafka.streams.kstream.internals.KStreamBranch$ > >> KStreamBranchProcessor.process(KStreamBranch.java:46) at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process( > >> > >> > ProcessorNode.java:68) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.forward(StreamTask.java:338) at > >> org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at > >> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ > >> KStreamMapProcessor.process(KStreamMapValues.java:42) at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process( > >> > >> > ProcessorNode.java:68) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.forward(StreamTask.java:338) at > >> org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at > >> org.apache.kafka.streams.processor.internals. > >> SourceNode.process(SourceNode.java:64) at > >> org.apache.kafka.streams.processor.internals. > >> StreamTask.process(StreamTask.java:174) at > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> > >> > StreamThread.java:320) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:218) > >> > >> Looks like the timestamp passed to the ProducerRecord is -1, > >> though I am not passing any timestamp explicitly. I am not sure > >> why this happens. But I see that the Javadoc for ProducerRecord > >> says the following .. > >> > >> The record also has an associated timestamp. If the user did not > >> provide a > >>> timestamp, the producer will stamp the record with its current > >>> time. The timestamp eventually used by Kafka depends on the > >>> timestamp type configured for the topic. If the topic is > >>> configured to use CreateTime, the timestamp in the producer > >>> record will be used by the broker. If the topic is configured > >>> to use LogAppendTime, the timestamp in the producer record will > >>> be overwritten by the broker with the broker local time when it > >>> appends the message to its log. In either of the cases above, > >>> the timestamp that has actually been used will be returned to > >>> user in RecordMetadata > >> > >> > >> 1. Will this problem be solved if I configure the topic with > >> LogAppendTime or CreateTime explicitly ? 2. What is the default > >> setting of this property in a newly created topic ? 3. How do I > >> change it (what is the name of the property to be set) ? 4. Any > >> idea why I face this problem in the cluster mode but not in the > >> local mode ? > >> > >> BTW I am using 0.10.1. > >> > >> Any help / pointer will be appreciated ? > >> > >> regards. > >> > >> -- Debasish Ghosh http://manning.com/ghosh2 > >> http://manning.com/ghosh > >> > >> Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: > >> http://github.com/debasishg > >> > > > > > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYE8jGAAoJECnhiMLycopPZmYQAIJ+wCZb8l1dJNDxbpWvnopz > pRB/YKoXLUA/BxQ/z8PZR8gd/0evQ92Kwt/7ZuS1no7o1CNixptZ81ycoxN7pj4q > QuVN/D9QDJvCDUdhDHFT50dwvHehgoD7oj2MLOlwCH1CWjd8REyRP+8gQDJ/6jko > frQUXK5hpT3QDl2F3kvS0JL40SDSOxAIxxFL0EH4midCXHhn3KR/XmdFSRm8Gmnp > jZ6+FpTEO3ntav0uhaA5zPwoMAIyc/Jcx6rFrzGBcnPPE0LkYCvPhmmN44OCe7N6 > mR/y13IAVUfaOmQqdXr8lkLVWdSxFKX37wASie43jripHygFxOAYn5NtHqan4OvK > EVjQDL87fZ+938ML97CGtiTdhLX65WBvd9IWgScI6vHZ84AgaeWvLLdZeHNWDboF > NJPWqk9VYFObre8pGHrFEha7RPz0fBqWP8uTc1pEZDIll44NYklPmC7aA0YnTlkU > rkpkoAyhoGi0ohvfxpDg6cG36M8KU5YbbeT3Tq4MQwFx9Fwn9wf/Kpw0KLewEMil > NqNWYDwxBPlEDkRxjGjMn9S9oQPqmPE3orQL6Va0OGJKoj1ezAruSP+IG+NibKpG > SFbIdzg5L9fHINaGudd03Oir4QY76hTaPnErLOKeGmoLVyS7dij6Nw/ol1DV3pnl > 56kTjPfgC4o9k0Vh0Ym4 > =SlO6 > -----END PGP SIGNATURE----- > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg