I agree .. the problem is DC/OS still ships the older version. Let me check if I can upgrade this ..
Thanks! On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > Btw: I would highly recommend to use Kafka 0.10.1 -- there are many > new Streams feature and usability improvements and bug fixes. > > - -Matthias > > On 10/28/16 11:42 PM, Matthias J. Sax wrote: > > That sounds reasonable. However, I am wondering how your Streams > > application can connect to 0.9 broker in the first place. Streams > > internally uses standard Kafka clients, and those are not backward > > compatible. Thus, the 0.10 Streams clients should not be able to > > connect to 0.9 broker. > > > > In case you do have 0.10 brokers, it might however happen, that > > bin/kafka-console-producer.sh does use 0.9 producer. Broker are > > backward compatible, thus, a 0.9 producer can write to 0.10 broker > > (and in this case record TS would be invalid). While I assume that > > in you local environment you are using 0.10 > > bin/kafka-console-produer.sh and thus all works fine. > > > > > > -Matthias > > > > > > On 10/28/16 11:00 PM, Debasish Ghosh wrote: > >> Hello Mathias - > > > >> Thanks a lot for the response. I think what may be happening is > >> a version mismatch between the development & deployment versions > >> of Kafka. The Kafka streams application that I developed uses > >> 0.10.0 based libraries. And my local environment contains a > >> server installation of the same version. Hence it works ok in my > >> local environment. > > > >> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the > >> service through DC/OS cli. And I use this version to load records > >> into the input topic. And try to consume using the deployed > >> streams application which I developed using 0.10.0. Hence the > >> producer did not put the timestamp while the consumer expects to > >> have one. > > > >> I need to check if 0.10.x is available for DC/OS .. > > > >> Thanks again for your suggestions. > > > > > >> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax > >> <matth...@confluent.io> wrote: > > > >> Hey, > > > >> we just added a new FAQ entry for upcoming CP 3.2 release that > >> answers your question. I just c&p it here. More concrete answer > >> below. > > > >>>>> If you get an exception similar to the one shown below, > >>>>> there are multiple possible causes: > >>>>> > >>>>> Exception in thread "StreamThread-1" > >>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at > >>>>> > >>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRe > c > > > >>>>> > ord > > > >>>>> > > .java:60) > >>>>> > >>>>> This error means that the timestamp extractor of your Kafka > >>>>> Streams application failed to extract a valid timestamp > >>>>> from a record. Typically, this points to a problem with the > >>>>> record (e.g., the record does not contain a timestamp at > >>>>> all), but it could also indicate a problem or bug in the > >>>>> timestamp extractor used by the application. > >>>>> > >>>>> When does a record not contain a valid timestamp: > >>>>> > >>>>> If you are using the default > >>>>> ConsumerRecordTimestampExtractor, it is most likely that > >>>>> your records do not carry an embedded timestamp (embedded > >>>>> record timestamps got introduced in Kafka's message format > >>>>> in Kafka 0.10). This might happen, if you consume a topic > >>>>> that is written by old Kafka producer clients (ie, version > >>>>> 0.9 or earlier) or third party producer clients. A common > >>>>> situation where this may happen is after upgrading your > >>>>> Kafka cluster from 0.9 to 0.10, where all the data that was > >>>>> generated with 0.9 is not compatible with the 0.10 message > >>>>> format. If you are using a custom timestamp extractor, make > >>>>> sure that your extractor is robust to missing timestamps in > >>>>> your records. For example, you can return a default or > >>>>> estimated timestamp if you cannot extract a valid timestamp > >>>>> (maybe the timstamp field in your data is just missing). > >>>>> You can also switch to processing time semantics via > >>>>> WallclockTimestampExtractor; whether such a fallback is an > >>>>> appropriate response to this situation depends on your use > >>>>> case. However, as a first step you should identify and fix > >>>>> the root cause for why such problematic records were > >>>>> written to Kafka in the first place. In a second step you > >>>>> may consider applying workarounds (as described above) when > >>>>> dealing with such records (for example, if you need to > >>>>> process those records after all). Another option is to > >>>>> regenerate the records with correct timestamps and write > >>>>> them to a new Kafka topic. > >>>>> > >>>>> When the timestamp extractor causes the problem: > >>>>> > >>>>> In this situation you should debug and fix the erroneous > >>>>> extractor. If the extractor is built into Kafka, please > >>>>> report the bug to the Kafka developer mailing list at > >>>>> d...@kafka.apache.org (see instructions > >>>>> http://kafka.apache.org/contact); in the meantime, you may > >>>>> write a custom timestamp extractor that fixes the problem > >>>>> and configure your application to use that extractor for > >>>>> the time being. > > > > > >> To address you questions more concretely: > > > >> 1. Yes an no: Yes, for any new data you write to you topic. No, > >> for any already written data that does not have a valid > >> timestamp set 2. Default is creating time 3. Config parameter > >> "message.timestamp.type") It's a broker side per topic setting > >> (however, be aware that Java KafkaProducer does verify the > >> timestamp locally before sending the message to the broker, thus > >> on -1 there will be the client side exception you did observe( 4. > >> I assume that you do consumer different topic with different TS > >> fields in you records. > > > >> Also have a look at: > >> http://docs.confluent.io/current/streams/concepts.html#time > > > > > > > >> -Matthias > > > > > >> On 10/28/16 5:42 AM, Debasish Ghosh wrote: > >>>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned > >>>>> in the last mail. And I am using Kafka within a DC/OS > >>>>> cluster under AWS. > >>>>> > >>>>> The version that I mentioned works ok is on my local > >>>>> machine using a local Kafka installation. And it works for > >>>>> both single broker and multi broker scenario. > >>>>> > >>>>> Thanks. > >>>>> > >>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh > >>>>> <ghosh.debas...@gmail.com> wrote: > >>>>> > >>>>>> Hello - > >>>>>> > >>>>>> I am a beginner in Kafka .. with my first Kafka streams > >>>>>> application .. > >>>>>> > >>>>>> I have a streams application that reads from a topic, > >>>>>> does some transformation on the data and writes to > >>>>>> another topic. The record that I manipulate is a CSV > >>>>>> record. > >>>>>> > >>>>>> It runs fine when I run it on a local Kafka instance. > >>>>>> > >>>>>> However when I run it on an AWS cluster, I get the > >>>>>> following exception when I try to produce the > >>>>>> transformed record into the target topic. > >>>>>> > >>>>>> Exception in thread "StreamThread-1" > >>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 > >>>>>> at > >>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init> > >>>>>> (ProducerRecord.java:60) at > >>>>>> org.apache.kafka.streams.processor.internals.SinkNode. > >>>>>> process(SinkNode.java:72) at > >>>>>> org.apache.kafka.streams.processor.internals. > >>>>>> StreamTask.forward(StreamTask.java:338) at > >>>>>> org.apache.kafka.streams.processor.internals. > >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > >>>>>> > >>>>>> > at > >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ > >>>>>> > >>>>>> > > > >>>>>> > KStreamMapProcessor.process(KStreamMapValues.java:42) at > >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces > s > > > >>>>>> > ( > >>>>>> > >>>>>> > > > >>>>>> > > ProcessorNode.java:68) > >>>>>> at org.apache.kafka.streams.processor.internals. > >>>>>> StreamTask.forward(StreamTask.java:338) at > >>>>>> org.apache.kafka.streams.processor.internals. > >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > >>>>>> > >>>>>> > at > >>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$ > >>>>>> > >>>>>> > > > >>>>>> > KStreamPassThroughProcessor.process(KStreamPassThrough.java:34) > >>>>>> at > >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces > s > > > >>>>>> > ( > >>>>>> > >>>>>> > > > >>>>>> > > ProcessorNode.java:68) > >>>>>> at org.apache.kafka.streams.processor.internals. > >>>>>> StreamTask.forward(StreamTask.java:351) at > >>>>>> org.apache.kafka.streams.processor.internals. > >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192) > >>>>>> > >>>>>> > at > >>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$ > >>>>>> KStreamBranchProcessor.process(KStreamBranch.java:46) at > >>>>>> > >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces > s > > > >>>>>> > ( > >>>>>> > >>>>>> > > > >>>>>> > > ProcessorNode.java:68) > >>>>>> at org.apache.kafka.streams.processor.internals. > >>>>>> StreamTask.forward(StreamTask.java:338) at > >>>>>> org.apache.kafka.streams.processor.internals. > >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > >>>>>> > >>>>>> > at > >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ > >>>>>> > >>>>>> > > > >>>>>> > KStreamMapProcessor.process(KStreamMapValues.java:42) at > >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces > s > > > >>>>>> > ( > >>>>>> > >>>>>> > > > >>>>>> > > ProcessorNode.java:68) > >>>>>> at org.apache.kafka.streams.processor.internals. > >>>>>> StreamTask.forward(StreamTask.java:338) at > >>>>>> org.apache.kafka.streams.processor.internals. > >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > >>>>>> > >>>>>> > at org.apache.kafka.streams.processor.internals. > >>>>>> SourceNode.process(SourceNode.java:64) at > >>>>>> org.apache.kafka.streams.processor.internals. > >>>>>> StreamTask.process(StreamTask.java:174) at > >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop > ( > >>>>>> > >>>>>> > > > >>>>>> > >>>>>> > > StreamThread.java:320) > >>>>>> at org.apache.kafka.streams.processor.internals. > >>>>>> StreamThread.run(StreamThread.java:218) > >>>>>> > >>>>>> Looks like the timestamp passed to the ProducerRecord is > >>>>>> -1, though I am not passing any timestamp explicitly. I > >>>>>> am not sure why this happens. But I see that the Javadoc > >>>>>> for ProducerRecord says the following .. > >>>>>> > >>>>>> The record also has an associated timestamp. If the user > >>>>>> did not provide a > >>>>>>> timestamp, the producer will stamp the record with its > >>>>>>> current time. The timestamp eventually used by Kafka > >>>>>>> depends on the timestamp type configured for the > >>>>>>> topic. If the topic is configured to use CreateTime, > >>>>>>> the timestamp in the producer record will be used by > >>>>>>> the broker. If the topic is configured to use > >>>>>>> LogAppendTime, the timestamp in the producer record > >>>>>>> will be overwritten by the broker with the broker local > >>>>>>> time when it appends the message to its log. In either > >>>>>>> of the cases above, the timestamp that has actually > >>>>>>> been used will be returned to user in RecordMetadata > >>>>>> > >>>>>> > >>>>>> 1. Will this problem be solved if I configure the topic > >>>>>> with LogAppendTime or CreateTime explicitly ? 2. What is > >>>>>> the default setting of this property in a newly created > >>>>>> topic ? 3. How do I change it (what is the name of the > >>>>>> property to be set) ? 4. Any idea why I face this > >>>>>> problem in the cluster mode but not in the local mode ? > >>>>>> > >>>>>> BTW I am using 0.10.1. > >>>>>> > >>>>>> Any help / pointer will be appreciated ? > >>>>>> > >>>>>> regards. > >>>>>> > >>>>>> -- Debasish Ghosh http://manning.com/ghosh2 > >>>>>> http://manning.com/ghosh > >>>>>> > >>>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com > >>>>>> Code: http://github.com/debasishg > >>>>>> > >>>>> > >>>>> > >>>>> > >>> > > > > > > > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYFEcMAAoJECnhiMLycopPPrwQAJyWn+InO+JcrDnNaSkfEt3n > 6sp5rjINdTEA1PIorEDDQcwaq8gB/DwTQOKsBUDnukLc4VI/HPzpWRaBGVJkw+ki > tm1UpGG4LBlvQ/E4S3a+c15X03IgNQ3htLwipuC0qqtpYmo2OB2+035Ewch1RlRl > E3mL1v14CEsvf/a+If3w+wkS3CoSey6SlWBk//Z0OCd7yy68DxO94JpxnP0M7vNe > zICCnxqSHTFjNMipQP/uX0hT2HM0J1q4HeWCKcVB6VQgpu97gypQT25L5iatOv41 > mFXVFKrYllvlYgLXq5PakI47H1DnkZNlN8maiKLC+7nrzqy0VTQhdxPLg6mVqVPX > MrkJ2jzrvI58F37Ac8vRFvgBJo5XVgaocY71rLmrVn3WA4oUpJRGB5fZe5vqJbDn > xAPjgRU2BA3l8nekG5iQ1O5osAhkT4PNzA/WTV2FGoNUu/zNupfe0Qipnsm8rqIM > RNTlCzDQU2X3dqUTm+Ze5Sn6WTjyiu9HPhYXrCgncAMFHMVH/4Tq53aJoiC7cz72 > IMXrQr7oU8hkgCzDMQ+kncHnquj23xDt7lsUyD8AJ6hfOcDLKQ3XyXo72bjnpGYt > 21qBP3JqABkeHYrSFuR3BCL/VJ0JSGgjBVkKjXwZOZ+3lDAuHRd/5ZR5AeoveHwO > rA3fRxGlqR7RWyElKC51 > =zBM7 > -----END PGP SIGNATURE----- > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg