-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Hey,
we just added a new FAQ entry for upcoming CP 3.2 release that answers your question. I just c&p it here. More concrete answer below. > If you get an exception similar to the one shown below, there are > multiple possible causes: > > Exception in thread "StreamThread-1" > java.lang.IllegalArgumentException: Invalid timestamp -1 at > org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord .java:60) > > This error means that the timestamp extractor of your Kafka > Streams application failed to extract a valid timestamp from a > record. Typically, this points to a problem with the record (e.g., > the record does not contain a timestamp at all), but it could also > indicate a problem or bug in the timestamp extractor used by the > application. > > When does a record not contain a valid timestamp: > > If you are using the default ConsumerRecordTimestampExtractor, it > is most likely that your records do not carry an embedded > timestamp (embedded record timestamps got introduced in Kafka's > message format in Kafka 0.10). This might happen, if you consume a > topic that is written by old Kafka producer clients (ie, version > 0.9 or earlier) or third party producer clients. A common situation > where this may happen is after upgrading your Kafka cluster from > 0.9 to 0.10, where all the data that was generated with 0.9 is not > compatible with the 0.10 message format. If you are using a custom > timestamp extractor, make sure that your extractor is robust to > missing timestamps in your records. For example, you can return a > default or estimated timestamp if you cannot extract a valid > timestamp (maybe the timstamp field in your data is just missing). > You can also switch to processing time semantics via > WallclockTimestampExtractor; whether such a fallback is an > appropriate response to this situation depends on your use case. > However, as a first step you should identify and fix the root cause > for why such problematic records were written to Kafka in the first > place. In a second step you may consider applying workarounds (as > described above) when dealing with such records (for example, if > you need to process those records after all). Another option is to > regenerate the records with correct timestamps and write them to a > new Kafka topic. > > When the timestamp extractor causes the problem: > > In this situation you should debug and fix the erroneous extractor. > If the extractor is built into Kafka, please report the bug to the > Kafka developer mailing list at d...@kafka.apache.org (see > instructions http://kafka.apache.org/contact); in the meantime, you > may write a custom timestamp extractor that fixes the problem and > configure your application to use that extractor for the time > being. To address you questions more concretely: 1. Yes an no: Yes, for any new data you write to you topic. No, for any already written data that does not have a valid timestamp set 2. Default is creating time 3. Config parameter "message.timestamp.type") It's a broker side per topic setting (however, be aware that Java KafkaProducer does verify the timestamp locally before sending the message to the broker, thus on -1 there will be the client side exception you did observe( 4. I assume that you do consumer different topic with different TS fields in you records. Also have a look at: http://docs.confluent.io/current/streams/concepts.html#time - -Matthias On 10/28/16 5:42 AM, Debasish Ghosh wrote: > I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in the > last mail. And I am using Kafka within a DC/OS cluster under AWS. > > The version that I mentioned works ok is on my local machine using > a local Kafka installation. And it works for both single broker and > multi broker scenario. > > Thanks. > > On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh > <ghosh.debas...@gmail.com> wrote: > >> Hello - >> >> I am a beginner in Kafka .. with my first Kafka streams >> application .. >> >> I have a streams application that reads from a topic, does some >> transformation on the data and writes to another topic. The >> record that I manipulate is a CSV record. >> >> It runs fine when I run it on a local Kafka instance. >> >> However when I run it on an AWS cluster, I get the following >> exception when I try to produce the transformed record into the >> target topic. >> >> Exception in thread "StreamThread-1" >> java.lang.IllegalArgumentException: Invalid timestamp -1 at >> org.apache.kafka.clients.producer.ProducerRecord.<init> >> (ProducerRecord.java:60) at >> org.apache.kafka.streams.processor.internals.SinkNode. >> process(SinkNode.java:72) at >> org.apache.kafka.streams.processor.internals. >> StreamTask.forward(StreamTask.java:338) at >> org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at >> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ >> KStreamMapProcessor.process(KStreamMapValues.java:42) at >> org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> >> ProcessorNode.java:68) >> at org.apache.kafka.streams.processor.internals. >> StreamTask.forward(StreamTask.java:338) at >> org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at >> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$ >> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34) >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> >> ProcessorNode.java:68) >> at org.apache.kafka.streams.processor.internals. >> StreamTask.forward(StreamTask.java:351) at >> org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.forward(ProcessorContextImpl.java:192) at >> org.apache.kafka.streams.kstream.internals.KStreamBranch$ >> KStreamBranchProcessor.process(KStreamBranch.java:46) at >> org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> >> ProcessorNode.java:68) >> at org.apache.kafka.streams.processor.internals. >> StreamTask.forward(StreamTask.java:338) at >> org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at >> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ >> KStreamMapProcessor.process(KStreamMapValues.java:42) at >> org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> >> ProcessorNode.java:68) >> at org.apache.kafka.streams.processor.internals. >> StreamTask.forward(StreamTask.java:338) at >> org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at >> org.apache.kafka.streams.processor.internals. >> SourceNode.process(SourceNode.java:64) at >> org.apache.kafka.streams.processor.internals. >> StreamTask.process(StreamTask.java:174) at >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> >> StreamThread.java:320) >> at org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:218) >> >> Looks like the timestamp passed to the ProducerRecord is -1, >> though I am not passing any timestamp explicitly. I am not sure >> why this happens. But I see that the Javadoc for ProducerRecord >> says the following .. >> >> The record also has an associated timestamp. If the user did not >> provide a >>> timestamp, the producer will stamp the record with its current >>> time. The timestamp eventually used by Kafka depends on the >>> timestamp type configured for the topic. If the topic is >>> configured to use CreateTime, the timestamp in the producer >>> record will be used by the broker. If the topic is configured >>> to use LogAppendTime, the timestamp in the producer record will >>> be overwritten by the broker with the broker local time when it >>> appends the message to its log. In either of the cases above, >>> the timestamp that has actually been used will be returned to >>> user in RecordMetadata >> >> >> 1. Will this problem be solved if I configure the topic with >> LogAppendTime or CreateTime explicitly ? 2. What is the default >> setting of this property in a newly created topic ? 3. How do I >> change it (what is the name of the property to be set) ? 4. Any >> idea why I face this problem in the cluster mode but not in the >> local mode ? >> >> BTW I am using 0.10.1. >> >> Any help / pointer will be appreciated ? >> >> regards. >> >> -- Debasish Ghosh http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: >> http://github.com/debasishg >> > > > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYE8jGAAoJECnhiMLycopPZmYQAIJ+wCZb8l1dJNDxbpWvnopz pRB/YKoXLUA/BxQ/z8PZR8gd/0evQ92Kwt/7ZuS1no7o1CNixptZ81ycoxN7pj4q QuVN/D9QDJvCDUdhDHFT50dwvHehgoD7oj2MLOlwCH1CWjd8REyRP+8gQDJ/6jko frQUXK5hpT3QDl2F3kvS0JL40SDSOxAIxxFL0EH4midCXHhn3KR/XmdFSRm8Gmnp jZ6+FpTEO3ntav0uhaA5zPwoMAIyc/Jcx6rFrzGBcnPPE0LkYCvPhmmN44OCe7N6 mR/y13IAVUfaOmQqdXr8lkLVWdSxFKX37wASie43jripHygFxOAYn5NtHqan4OvK EVjQDL87fZ+938ML97CGtiTdhLX65WBvd9IWgScI6vHZ84AgaeWvLLdZeHNWDboF NJPWqk9VYFObre8pGHrFEha7RPz0fBqWP8uTc1pEZDIll44NYklPmC7aA0YnTlkU rkpkoAyhoGi0ohvfxpDg6cG36M8KU5YbbeT3Tq4MQwFx9Fwn9wf/Kpw0KLewEMil NqNWYDwxBPlEDkRxjGjMn9S9oQPqmPE3orQL6Va0OGJKoj1ezAruSP+IG+NibKpG SFbIdzg5L9fHINaGudd03Oir4QY76hTaPnErLOKeGmoLVyS7dij6Nw/ol1DV3pnl 56kTjPfgC4o9k0Vh0Ym4 =SlO6 -----END PGP SIGNATURE-----