-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 The simplest way should be to check the java classpath.
Insert an echo $CLASSPATH at the end of bin/kafka-run-class.sh Than run bin/kafka-console-producer.sh with no argument. You should see the classpath be printed out. Look for 'kafka-clients-XXX.jar' -- XXX will be the version number. - -Matthias On 10/29/16 12:11 AM, Debasish Ghosh wrote: > Hello Mathias - > > Regarding .. > > In case you do have 0.10 brokers, it might however happen, that > bin/kafka-console-producer.sh >> does use 0.9 producer. > > > How can I check this ? > > Thanks! > > On Sat, Oct 29, 2016 at 12:23 PM, Debasish Ghosh > <ghosh.debas...@gmail.com> wrote: > >> I agree .. the problem is DC/OS still ships the older version. >> Let me check if I can upgrade this .. >> >> Thanks! >> >> On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax >> <matth...@confluent.io> wrote: >> > Btw: I would highly recommend to use Kafka 0.10.1 -- there are > many new Streams feature and usability improvements and bug fixes. > > -Matthias > > On 10/28/16 11:42 PM, Matthias J. Sax wrote: >>>>> That sounds reasonable. However, I am wondering how your >>>>> Streams application can connect to 0.9 broker in the first >>>>> place. Streams internally uses standard Kafka clients, and >>>>> those are not backward compatible. Thus, the 0.10 Streams >>>>> clients should not be able to connect to 0.9 broker. >>>>> >>>>> In case you do have 0.10 brokers, it might however happen, >>>>> that bin/kafka-console-producer.sh does use 0.9 producer. >>>>> Broker are backward compatible, thus, a 0.9 producer can >>>>> write to 0.10 broker (and in this case record TS would be >>>>> invalid). While I assume that in you local environment you >>>>> are using 0.10 bin/kafka-console-produer.sh and thus all >>>>> works fine. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 10/28/16 11:00 PM, Debasish Ghosh wrote: >>>>>> Hello Mathias - >>>>> >>>>>> Thanks a lot for the response. I think what may be >>>>>> happening is a version mismatch between the development & >>>>>> deployment versions of Kafka. The Kafka streams >>>>>> application that I developed uses 0.10.0 based libraries. >>>>>> And my local environment contains a server installation >>>>>> of the same version. Hence it works ok in my local >>>>>> environment. >>>>> >>>>>> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install >>>>>> the service through DC/OS cli. And I use this version to >>>>>> load records into the input topic. And try to consume >>>>>> using the deployed streams application which I developed >>>>>> using 0.10.0. Hence the producer did not put the >>>>>> timestamp while the consumer expects to have one. >>>>> >>>>>> I need to check if 0.10.x is available for DC/OS .. >>>>> >>>>>> Thanks again for your suggestions. >>>>> >>>>> >>>>>> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax >>>>>> <matth...@confluent.io> wrote: >>>>> >>>>>> Hey, >>>>> >>>>>> we just added a new FAQ entry for upcoming CP 3.2 release >>>>>> that answers your question. I just c&p it here. More >>>>>> concrete answer below. >>>>> >>>>>>>>> If you get an exception similar to the one shown >>>>>>>>> below, there are multiple possible causes: >>>>>>>>> >>>>>>>>> Exception in thread "StreamThread-1" >>>>>>>>> java.lang.IllegalArgumentException: Invalid >>>>>>>>> timestamp -1 at >>>>>>>>> >>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(Produc erRe > >>>>>>>>> c >>>>> >>>>>>>>> > ord >>>>> >>>>>>>>> >>>>> .java:60) >>>>>>>>> >>>>>>>>> This error means that the timestamp extractor of >>>>>>>>> your Kafka Streams application failed to extract a >>>>>>>>> valid timestamp from a record. Typically, this >>>>>>>>> points to a problem with the record (e.g., the >>>>>>>>> record does not contain a timestamp at all), but it >>>>>>>>> could also indicate a problem or bug in the >>>>>>>>> timestamp extractor used by the application. >>>>>>>>> >>>>>>>>> When does a record not contain a valid timestamp: >>>>>>>>> >>>>>>>>> If you are using the default >>>>>>>>> ConsumerRecordTimestampExtractor, it is most likely >>>>>>>>> that your records do not carry an embedded >>>>>>>>> timestamp (embedded record timestamps got >>>>>>>>> introduced in Kafka's message format in Kafka >>>>>>>>> 0.10). This might happen, if you consume a topic >>>>>>>>> that is written by old Kafka producer clients (ie, >>>>>>>>> version 0.9 or earlier) or third party producer >>>>>>>>> clients. A common situation where this may happen >>>>>>>>> is after upgrading your Kafka cluster from 0.9 to >>>>>>>>> 0.10, where all the data that was generated with >>>>>>>>> 0.9 is not compatible with the 0.10 message format. >>>>>>>>> If you are using a custom timestamp extractor, >>>>>>>>> make sure that your extractor is robust to missing >>>>>>>>> timestamps in your records. For example, you can >>>>>>>>> return a default or estimated timestamp if you >>>>>>>>> cannot extract a valid timestamp (maybe the >>>>>>>>> timstamp field in your data is just missing). You >>>>>>>>> can also switch to processing time semantics via >>>>>>>>> WallclockTimestampExtractor; whether such a >>>>>>>>> fallback is an appropriate response to this >>>>>>>>> situation depends on your use case. However, as a >>>>>>>>> first step you should identify and fix the root >>>>>>>>> cause for why such problematic records were written >>>>>>>>> to Kafka in the first place. In a second step you >>>>>>>>> may consider applying workarounds (as described >>>>>>>>> above) when dealing with such records (for example, >>>>>>>>> if you need to process those records after all). >>>>>>>>> Another option is to regenerate the records with >>>>>>>>> correct timestamps and write them to a new Kafka >>>>>>>>> topic. >>>>>>>>> >>>>>>>>> When the timestamp extractor causes the problem: >>>>>>>>> >>>>>>>>> In this situation you should debug and fix the >>>>>>>>> erroneous extractor. If the extractor is built into >>>>>>>>> Kafka, please report the bug to the Kafka developer >>>>>>>>> mailing list at d...@kafka.apache.org (see >>>>>>>>> instructions http://kafka.apache.org/contact); in >>>>>>>>> the meantime, you may write a custom timestamp >>>>>>>>> extractor that fixes the problem and configure your >>>>>>>>> application to use that extractor for the time >>>>>>>>> being. >>>>> >>>>> >>>>>> To address you questions more concretely: >>>>> >>>>>> 1. Yes an no: Yes, for any new data you write to you >>>>>> topic. No, for any already written data that does not >>>>>> have a valid timestamp set 2. Default is creating time 3. >>>>>> Config parameter "message.timestamp.type") It's a broker >>>>>> side per topic setting (however, be aware that Java >>>>>> KafkaProducer does verify the timestamp locally before >>>>>> sending the message to the broker, thus on -1 there will >>>>>> be the client side exception you did observe( 4. I assume >>>>>> that you do consumer different topic with different TS >>>>>> fields in you records. >>>>> >>>>>> Also have a look at: >>>>>> http://docs.confluent.io/current/streams/concepts.html#time >>>>> >>>>> >>>>> >>>>>> >>>>>> - -Matthias >>>>> >>>>> >>>>>> On 10/28/16 5:42 AM, Debasish Ghosh wrote: >>>>>>>>> I am actually using 0.10.0 and NOT 0.10.1 as I >>>>>>>>> mentioned in the last mail. And I am using Kafka >>>>>>>>> within a DC/OS cluster under AWS. >>>>>>>>> >>>>>>>>> The version that I mentioned works ok is on my >>>>>>>>> local machine using a local Kafka installation. And >>>>>>>>> it works for both single broker and multi broker >>>>>>>>> scenario. >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> >>>>>>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh >>>>>>>>> <ghosh.debas...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hello - >>>>>>>>>> >>>>>>>>>> I am a beginner in Kafka .. with my first Kafka >>>>>>>>>> streams application .. >>>>>>>>>> >>>>>>>>>> I have a streams application that reads from a >>>>>>>>>> topic, does some transformation on the data and >>>>>>>>>> writes to another topic. The record that I >>>>>>>>>> manipulate is a CSV record. >>>>>>>>>> >>>>>>>>>> It runs fine when I run it on a local Kafka >>>>>>>>>> instance. >>>>>>>>>> >>>>>>>>>> However when I run it on an AWS cluster, I get >>>>>>>>>> the following exception when I try to produce >>>>>>>>>> the transformed record into the target topic. >>>>>>>>>> >>>>>>>>>> Exception in thread "StreamThread-1" >>>>>>>>>> java.lang.IllegalArgumentException: Invalid >>>>>>>>>> timestamp -1 at >>>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init> >>>>>>>>>> >>>>>>>>>> (ProducerRecord.java:60) at >>>>>>>>>> org.apache.kafka.streams.processor.internals.SinkNode. >>>>>>>>>> >>>>>>>>>> process(SinkNode.java:72) at >>>>>>>>>> org.apache.kafka.streams.processor.internals. >>>>>>>>>> StreamTask.forward(StreamTask.java:338) at >>>>>>>>>> org.apache.kafka.streams.processor.internals. >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> at >>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ >>>>>>>>>> >>>>>>>>>> >>>>> >>>>>>>>>> > >>>>>>>>>> KStreamMapProcessor.process(KStreamMapValues.java:42) at >>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr oces > >>>>>>>>>> s >>>>> >>>>>>>>>> > ( >>>>>>>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> ProcessorNode.java:68) >>>>>>>>>> at org.apache.kafka.streams.processor.internals. >>>>>>>>>> StreamTask.forward(StreamTask.java:338) at >>>>>>>>>> org.apache.kafka.streams.processor.internals. >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> at >>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough $ >>>>>>>>>> >>>>>>>>>> >>>>> >>>>>>>>>> > >>>>>>>>>> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34) >>>>>>>>>> at >>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr oces > >>>>>>>>>> s >>>>> >>>>>>>>>> > ( >>>>>>>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> ProcessorNode.java:68) >>>>>>>>>> at org.apache.kafka.streams.processor.internals. >>>>>>>>>> StreamTask.forward(StreamTask.java:351) at >>>>>>>>>> org.apache.kafka.streams.processor.internals. >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192) >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> at >>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$ >>>>>>>>>> >>>>>>>>>> KStreamBranchProcessor.process(KStreamBranch.java:46) at >>>>>>>>>> >>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr oces > >>>>>>>>>> s >>>>> >>>>>>>>>> > ( >>>>>>>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> ProcessorNode.java:68) >>>>>>>>>> at org.apache.kafka.streams.processor.internals. >>>>>>>>>> StreamTask.forward(StreamTask.java:338) at >>>>>>>>>> org.apache.kafka.streams.processor.internals. >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> at >>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ >>>>>>>>>> >>>>>>>>>> >>>>> >>>>>>>>>> > >>>>>>>>>> KStreamMapProcessor.process(KStreamMapValues.java:42) at >>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr oces > >>>>>>>>>> s >>>>> >>>>>>>>>> > ( >>>>>>>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> ProcessorNode.java:68) >>>>>>>>>> at org.apache.kafka.streams.processor.internals. >>>>>>>>>> StreamTask.forward(StreamTask.java:338) at >>>>>>>>>> org.apache.kafka.streams.processor.internals. >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> at org.apache.kafka.streams.processor.internals. >>>>>>>>>> SourceNode.process(SourceNode.java:64) at >>>>>>>>>> org.apache.kafka.streams.processor.internals. >>>>>>>>>> StreamTask.process(StreamTask.java:174) at >>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.run Loop > >>>>>>>>>> ( >>>>>>>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>>>>>>> >>>>> StreamThread.java:320) >>>>>>>>>> at org.apache.kafka.streams.processor.internals. >>>>>>>>>> StreamThread.run(StreamThread.java:218) >>>>>>>>>> >>>>>>>>>> Looks like the timestamp passed to the >>>>>>>>>> ProducerRecord is -1, though I am not passing any >>>>>>>>>> timestamp explicitly. I am not sure why this >>>>>>>>>> happens. But I see that the Javadoc for >>>>>>>>>> ProducerRecord says the following .. >>>>>>>>>> >>>>>>>>>> The record also has an associated timestamp. If >>>>>>>>>> the user did not provide a >>>>>>>>>>> timestamp, the producer will stamp the record >>>>>>>>>>> with its current time. The timestamp eventually >>>>>>>>>>> used by Kafka depends on the timestamp type >>>>>>>>>>> configured for the topic. If the topic is >>>>>>>>>>> configured to use CreateTime, the timestamp in >>>>>>>>>>> the producer record will be used by the broker. >>>>>>>>>>> If the topic is configured to use >>>>>>>>>>> LogAppendTime, the timestamp in the producer >>>>>>>>>>> record will be overwritten by the broker with >>>>>>>>>>> the broker local time when it appends the >>>>>>>>>>> message to its log. In either of the cases >>>>>>>>>>> above, the timestamp that has actually been >>>>>>>>>>> used will be returned to user in >>>>>>>>>>> RecordMetadata >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 1. Will this problem be solved if I configure the >>>>>>>>>> topic with LogAppendTime or CreateTime explicitly >>>>>>>>>> ? 2. What is the default setting of this property >>>>>>>>>> in a newly created topic ? 3. How do I change it >>>>>>>>>> (what is the name of the property to be set) ? 4. >>>>>>>>>> Any idea why I face this problem in the cluster >>>>>>>>>> mode but not in the local mode ? >>>>>>>>>> >>>>>>>>>> BTW I am using 0.10.1. >>>>>>>>>> >>>>>>>>>> Any help / pointer will be appreciated ? >>>>>>>>>> >>>>>>>>>> regards. >>>>>>>>>> >>>>>>>>>> -- Debasish Ghosh http://manning.com/ghosh2 >>>>>>>>>> http://manning.com/ghosh >>>>>>>>>> >>>>>>>>>> Twttr: @debasishg Blog: >>>>>>>>>> http://debasishg.blogspot.com Code: >>>>>>>>>> http://github.com/debasishg >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >>>>> >>>>> >>>>> >>> >> >> >> >> -- Debasish Ghosh http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: >> http://github.com/debasishg >> > > > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYFaONAAoJECnhiMLycopPH/4P+gOUVW4Ab6UBlrRiap1snDkC 1xNaWIzlyF2i8nZ+FUTmXrvJiF1TSSw8f+apyZpjf+Q8uMS1Bv6ZzqqiHVC0+gFb Ymis+pOHhEl3je5uJWf41emrUxvJHalDlrLqLCk0cxlTYgBCgoAxLtzbvFrejw0e uYcfjz+mERK4upNZS3KbO8tMMpr+M163u02dUAhT6kuaJNfSICNKKEVIK9WrCAMN skhrVhcM6XRh6YisU73erg2grcGAMTxYf53eWt8saRqICMlcmxDbTHSxJ6Qog4l4 c6OSxxtnB+rXYSDEtoWH3CSxkPK0zlLUDcKXqz4bUNrWgoaCjAfz2WuANNskUi3L dZlfO+vvbS2NLjqNFzqVUjV5tnbdCL7MTO4ByQZ7Jh9TQeOyMkHW8+fGsCZUJ3Ex SIx95MYJyOk1n390yFjjeJEQT/yHIq7nXXgxjJ6dBPfEIB6VwXOHGvaucXMus2XF ioBr/CuoNhWTfHn19TNSkSObJP61W2YCA1xHlSzVutHoISKHZKRq0z6AbbLdD7Z3 weLV5zHrnmibeKDYc+OX+60Kr+YsgSqeNwDx+EVFjkMTagUydzg06PcWigSLl6ZC 2rxs2Esb0W+Q4Iy+IYCzfQuQGQz5oSiUB32hxTio6dDz8otBY6+U3QrBz7mOtc+v OeP4OeKsbMBBZGJNa0Ux =NjV2 -----END PGP SIGNATURE-----