I will check out all options that u mentioned. I am sure on my local it's all 0.10.0, so no wonder it works correctly. In The cluster, I just checked the version of Kafka that ships with DC/OS 1.8 (the version I get with dcos package install kafka) is 0.9.0. Regarding ..
In case you do have 0.10 brokers, it might however happen, that bin/kafka-console-producer.sh > does use 0.9 producer. How can I check this ? Thanks. On Sat, Oct 29, 2016 at 12:12 PM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > That sounds reasonable. However, I am wondering how your Streams > application can connect to 0.9 broker in the first place. Streams > internally uses standard Kafka clients, and those are not backward > compatible. Thus, the 0.10 Streams clients should not be able to > connect to 0.9 broker. > > In case you do have 0.10 brokers, it might however happen, that > bin/kafka-console-producer.sh does use 0.9 producer. Broker are > backward compatible, thus, a 0.9 producer can write to 0.10 broker > (and in this case record TS would be invalid). While I assume that in > you local environment you are using 0.10 bin/kafka-console-produer.sh > and thus all works fine. > > > - -Matthias > > > On 10/28/16 11:00 PM, Debasish Ghosh wrote: > > Hello Mathias - > > > > Thanks a lot for the response. I think what may be happening is a > > version mismatch between the development & deployment versions of > > Kafka. The Kafka streams application that I developed uses 0.10.0 > > based libraries. And my local environment contains a server > > installation of the same version. Hence it works ok in my local > > environment. > > > > But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the service > > through DC/OS cli. And I use this version to load records into the > > input topic. And try to consume using the deployed streams > > application which I developed using 0.10.0. Hence the producer did > > not put the timestamp while the consumer expects to have one. > > > > I need to check if 0.10.x is available for DC/OS .. > > > > Thanks again for your suggestions. > > > > > > On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax > > <matth...@confluent.io> wrote: > > > > Hey, > > > > we just added a new FAQ entry for upcoming CP 3.2 release that > > answers your question. I just c&p it here. More concrete answer > > below. > > > >>>> If you get an exception similar to the one shown below, there > >>>> are multiple possible causes: > >>>> > >>>> Exception in thread "StreamThread-1" > >>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at > >>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRec > ord > > > >>>> > .java:60) > >>>> > >>>> This error means that the timestamp extractor of your Kafka > >>>> Streams application failed to extract a valid timestamp from > >>>> a record. Typically, this points to a problem with the record > >>>> (e.g., the record does not contain a timestamp at all), but > >>>> it could also indicate a problem or bug in the timestamp > >>>> extractor used by the application. > >>>> > >>>> When does a record not contain a valid timestamp: > >>>> > >>>> If you are using the default > >>>> ConsumerRecordTimestampExtractor, it is most likely that your > >>>> records do not carry an embedded timestamp (embedded record > >>>> timestamps got introduced in Kafka's message format in Kafka > >>>> 0.10). This might happen, if you consume a topic that is > >>>> written by old Kafka producer clients (ie, version 0.9 or > >>>> earlier) or third party producer clients. A common situation > >>>> where this may happen is after upgrading your Kafka cluster > >>>> from 0.9 to 0.10, where all the data that was generated with > >>>> 0.9 is not compatible with the 0.10 message format. If you > >>>> are using a custom timestamp extractor, make sure that your > >>>> extractor is robust to missing timestamps in your records. > >>>> For example, you can return a default or estimated timestamp > >>>> if you cannot extract a valid timestamp (maybe the timstamp > >>>> field in your data is just missing). You can also switch to > >>>> processing time semantics via WallclockTimestampExtractor; > >>>> whether such a fallback is an appropriate response to this > >>>> situation depends on your use case. However, as a first step > >>>> you should identify and fix the root cause for why such > >>>> problematic records were written to Kafka in the first place. > >>>> In a second step you may consider applying workarounds (as > >>>> described above) when dealing with such records (for example, > >>>> if you need to process those records after all). Another > >>>> option is to regenerate the records with correct timestamps > >>>> and write them to a new Kafka topic. > >>>> > >>>> When the timestamp extractor causes the problem: > >>>> > >>>> In this situation you should debug and fix the erroneous > >>>> extractor. If the extractor is built into Kafka, please > >>>> report the bug to the Kafka developer mailing list at > >>>> d...@kafka.apache.org (see instructions > >>>> http://kafka.apache.org/contact); in the meantime, you may > >>>> write a custom timestamp extractor that fixes the problem > >>>> and configure your application to use that extractor for the > >>>> time being. > > > > > > To address you questions more concretely: > > > > 1. Yes an no: Yes, for any new data you write to you topic. No, > > for any already written data that does not have a valid timestamp > > set 2. Default is creating time 3. Config parameter > > "message.timestamp.type") It's a broker side per topic setting > > (however, be aware that Java KafkaProducer does verify the > > timestamp locally before sending the message to the broker, thus on > > -1 there will be the client side exception you did observe( 4. I > > assume that you do consumer different topic with different TS > > fields in you records. > > > > Also have a look at: > > http://docs.confluent.io/current/streams/concepts.html#time > > > > > > > > -Matthias > > > > > > On 10/28/16 5:42 AM, Debasish Ghosh wrote: > >>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in > >>>> the last mail. And I am using Kafka within a DC/OS cluster > >>>> under AWS. > >>>> > >>>> The version that I mentioned works ok is on my local machine > >>>> using a local Kafka installation. And it works for both > >>>> single broker and multi broker scenario. > >>>> > >>>> Thanks. > >>>> > >>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh > >>>> <ghosh.debas...@gmail.com> wrote: > >>>> > >>>>> Hello - > >>>>> > >>>>> I am a beginner in Kafka .. with my first Kafka streams > >>>>> application .. > >>>>> > >>>>> I have a streams application that reads from a topic, does > >>>>> some transformation on the data and writes to another > >>>>> topic. The record that I manipulate is a CSV record. > >>>>> > >>>>> It runs fine when I run it on a local Kafka instance. > >>>>> > >>>>> However when I run it on an AWS cluster, I get the > >>>>> following exception when I try to produce the transformed > >>>>> record into the target topic. > >>>>> > >>>>> Exception in thread "StreamThread-1" > >>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 > >>>>> at org.apache.kafka.clients.producer.ProducerRecord.<init> > >>>>> (ProducerRecord.java:60) at > >>>>> org.apache.kafka.streams.processor.internals.SinkNode. > >>>>> process(SinkNode.java:72) at > >>>>> org.apache.kafka.streams.processor.internals. > >>>>> StreamTask.forward(StreamTask.java:338) at > >>>>> org.apache.kafka.streams.processor.internals. > >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > >>>>> at > >>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ > >>>>> > >>>>> > KStreamMapProcessor.process(KStreamMapValues.java:42) at > >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process > ( > >>>>> > >>>>> > > > >>>>> > ProcessorNode.java:68) > >>>>> at org.apache.kafka.streams.processor.internals. > >>>>> StreamTask.forward(StreamTask.java:338) at > >>>>> org.apache.kafka.streams.processor.internals. > >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > >>>>> at > >>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$ > >>>>> > >>>>> > KStreamPassThroughProcessor.process(KStreamPassThrough.java:34) > >>>>> at > >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process > ( > >>>>> > >>>>> > > > >>>>> > ProcessorNode.java:68) > >>>>> at org.apache.kafka.streams.processor.internals. > >>>>> StreamTask.forward(StreamTask.java:351) at > >>>>> org.apache.kafka.streams.processor.internals. > >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192) > >>>>> at > >>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$ > >>>>> KStreamBranchProcessor.process(KStreamBranch.java:46) at > >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process > ( > >>>>> > >>>>> > > > >>>>> > ProcessorNode.java:68) > >>>>> at org.apache.kafka.streams.processor.internals. > >>>>> StreamTask.forward(StreamTask.java:338) at > >>>>> org.apache.kafka.streams.processor.internals. > >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > >>>>> at > >>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ > >>>>> > >>>>> > KStreamMapProcessor.process(KStreamMapValues.java:42) at > >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process > ( > >>>>> > >>>>> > > > >>>>> > ProcessorNode.java:68) > >>>>> at org.apache.kafka.streams.processor.internals. > >>>>> StreamTask.forward(StreamTask.java:338) at > >>>>> org.apache.kafka.streams.processor.internals. > >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > >>>>> at org.apache.kafka.streams.processor.internals. > >>>>> SourceNode.process(SourceNode.java:64) at > >>>>> org.apache.kafka.streams.processor.internals. > >>>>> StreamTask.process(StreamTask.java:174) at > >>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >>>>> > >>>>> > > > >>>>> > StreamThread.java:320) > >>>>> at org.apache.kafka.streams.processor.internals. > >>>>> StreamThread.run(StreamThread.java:218) > >>>>> > >>>>> Looks like the timestamp passed to the ProducerRecord is > >>>>> -1, though I am not passing any timestamp explicitly. I am > >>>>> not sure why this happens. But I see that the Javadoc for > >>>>> ProducerRecord says the following .. > >>>>> > >>>>> The record also has an associated timestamp. If the user > >>>>> did not provide a > >>>>>> timestamp, the producer will stamp the record with its > >>>>>> current time. The timestamp eventually used by Kafka > >>>>>> depends on the timestamp type configured for the topic. > >>>>>> If the topic is configured to use CreateTime, the > >>>>>> timestamp in the producer record will be used by the > >>>>>> broker. If the topic is configured to use LogAppendTime, > >>>>>> the timestamp in the producer record will be overwritten > >>>>>> by the broker with the broker local time when it appends > >>>>>> the message to its log. In either of the cases above, the > >>>>>> timestamp that has actually been used will be returned > >>>>>> to user in RecordMetadata > >>>>> > >>>>> > >>>>> 1. Will this problem be solved if I configure the topic > >>>>> with LogAppendTime or CreateTime explicitly ? 2. What is > >>>>> the default setting of this property in a newly created > >>>>> topic ? 3. How do I change it (what is the name of the > >>>>> property to be set) ? 4. Any idea why I face this problem > >>>>> in the cluster mode but not in the local mode ? > >>>>> > >>>>> BTW I am using 0.10.1. > >>>>> > >>>>> Any help / pointer will be appreciated ? > >>>>> > >>>>> regards. > >>>>> > >>>>> -- Debasish Ghosh http://manning.com/ghosh2 > >>>>> http://manning.com/ghosh > >>>>> > >>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com > >>>>> Code: http://github.com/debasishg > >>>>> > >>>> > >>>> > >>>> > >> > > > > > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYFETkAAoJECnhiMLycopPY6EQAJhI9K5FnuAZ68ix/MDOb2d5 > mkewtvZSN2wgXBiqhouTYvqrU+//gPfIw6aONN8DW6Z5ufbzxRjclA1y9QAXaeQw > v9tuYxxtPwkw2oMuR3T8o4WI6dEsJao//YAE6fGqx4giWzHvbGZQBqwbidGVFkHE > K0BC+Rkuod1wJ3t8/2u/JG8YQtSV5b1t/kp6wDGS/yNzxHRhCkKr4nWdHD9y+CKJ > ej9+CD0wSqTHB5QpMLbbgd1CgX/QeJ8jweqeD7lWrgOWoDZxqnitxHCyQ484kdyw > ETIJWT02cAj760iZRN8l892rVtZNhpCgdY9uY4l8z9px93qS8P9kjlGDrm+jaPhu > nIBZnM2OoWEeg5rp52fQSIg37YIuVktrkvwj5C2kH1eo5iAnmJps/cZonP5BVEDR > T+yyYNPER6ltgpZrRugJggT8TerylA12K5ro3JuoDPxeRQIbAN8JGgqUZofiOzhq > DY56XVYlYAKcgj+8MqbygI0Q8pORnm2YNmtZgMYWThNuz6j6Nf3MmU5YIAc0g9ge > 5asNzW2nPxY1N6INGFn5ET4qq8AX9/75LJ+i6tAr8Ddqy0MPt8TFHwE0bCgdPCNo > Ef99ln2esbPnLujz+PeNuAh1RBab0Hets16jYF2dBwmKElgRCFH1u2gOfMG3cpjl > WCwK1XhEpIDdB0+pxzyD > =NSWH > -----END PGP SIGNATURE----- > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg