-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Btw: I would highly recommend to use Kafka 0.10.1 -- there are many new Streams feature and usability improvements and bug fixes.
- -Matthias On 10/28/16 11:42 PM, Matthias J. Sax wrote: > That sounds reasonable. However, I am wondering how your Streams > application can connect to 0.9 broker in the first place. Streams > internally uses standard Kafka clients, and those are not backward > compatible. Thus, the 0.10 Streams clients should not be able to > connect to 0.9 broker. > > In case you do have 0.10 brokers, it might however happen, that > bin/kafka-console-producer.sh does use 0.9 producer. Broker are > backward compatible, thus, a 0.9 producer can write to 0.10 broker > (and in this case record TS would be invalid). While I assume that > in you local environment you are using 0.10 > bin/kafka-console-produer.sh and thus all works fine. > > > -Matthias > > > On 10/28/16 11:00 PM, Debasish Ghosh wrote: >> Hello Mathias - > >> Thanks a lot for the response. I think what may be happening is >> a version mismatch between the development & deployment versions >> of Kafka. The Kafka streams application that I developed uses >> 0.10.0 based libraries. And my local environment contains a >> server installation of the same version. Hence it works ok in my >> local environment. > >> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the >> service through DC/OS cli. And I use this version to load records >> into the input topic. And try to consume using the deployed >> streams application which I developed using 0.10.0. Hence the >> producer did not put the timestamp while the consumer expects to >> have one. > >> I need to check if 0.10.x is available for DC/OS .. > >> Thanks again for your suggestions. > > >> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax >> <matth...@confluent.io> wrote: > >> Hey, > >> we just added a new FAQ entry for upcoming CP 3.2 release that >> answers your question. I just c&p it here. More concrete answer >> below. > >>>>> If you get an exception similar to the one shown below, >>>>> there are multiple possible causes: >>>>> >>>>> Exception in thread "StreamThread-1" >>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at >>>>> >>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRe c > >>>>> ord > >>>>> > .java:60) >>>>> >>>>> This error means that the timestamp extractor of your Kafka >>>>> Streams application failed to extract a valid timestamp >>>>> from a record. Typically, this points to a problem with the >>>>> record (e.g., the record does not contain a timestamp at >>>>> all), but it could also indicate a problem or bug in the >>>>> timestamp extractor used by the application. >>>>> >>>>> When does a record not contain a valid timestamp: >>>>> >>>>> If you are using the default >>>>> ConsumerRecordTimestampExtractor, it is most likely that >>>>> your records do not carry an embedded timestamp (embedded >>>>> record timestamps got introduced in Kafka's message format >>>>> in Kafka 0.10). This might happen, if you consume a topic >>>>> that is written by old Kafka producer clients (ie, version >>>>> 0.9 or earlier) or third party producer clients. A common >>>>> situation where this may happen is after upgrading your >>>>> Kafka cluster from 0.9 to 0.10, where all the data that was >>>>> generated with 0.9 is not compatible with the 0.10 message >>>>> format. If you are using a custom timestamp extractor, make >>>>> sure that your extractor is robust to missing timestamps in >>>>> your records. For example, you can return a default or >>>>> estimated timestamp if you cannot extract a valid timestamp >>>>> (maybe the timstamp field in your data is just missing). >>>>> You can also switch to processing time semantics via >>>>> WallclockTimestampExtractor; whether such a fallback is an >>>>> appropriate response to this situation depends on your use >>>>> case. However, as a first step you should identify and fix >>>>> the root cause for why such problematic records were >>>>> written to Kafka in the first place. In a second step you >>>>> may consider applying workarounds (as described above) when >>>>> dealing with such records (for example, if you need to >>>>> process those records after all). Another option is to >>>>> regenerate the records with correct timestamps and write >>>>> them to a new Kafka topic. >>>>> >>>>> When the timestamp extractor causes the problem: >>>>> >>>>> In this situation you should debug and fix the erroneous >>>>> extractor. If the extractor is built into Kafka, please >>>>> report the bug to the Kafka developer mailing list at >>>>> d...@kafka.apache.org (see instructions >>>>> http://kafka.apache.org/contact); in the meantime, you may >>>>> write a custom timestamp extractor that fixes the problem >>>>> and configure your application to use that extractor for >>>>> the time being. > > >> To address you questions more concretely: > >> 1. Yes an no: Yes, for any new data you write to you topic. No, >> for any already written data that does not have a valid >> timestamp set 2. Default is creating time 3. Config parameter >> "message.timestamp.type") It's a broker side per topic setting >> (however, be aware that Java KafkaProducer does verify the >> timestamp locally before sending the message to the broker, thus >> on -1 there will be the client side exception you did observe( 4. >> I assume that you do consumer different topic with different TS >> fields in you records. > >> Also have a look at: >> http://docs.confluent.io/current/streams/concepts.html#time > > > >> -Matthias > > >> On 10/28/16 5:42 AM, Debasish Ghosh wrote: >>>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned >>>>> in the last mail. And I am using Kafka within a DC/OS >>>>> cluster under AWS. >>>>> >>>>> The version that I mentioned works ok is on my local >>>>> machine using a local Kafka installation. And it works for >>>>> both single broker and multi broker scenario. >>>>> >>>>> Thanks. >>>>> >>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh >>>>> <ghosh.debas...@gmail.com> wrote: >>>>> >>>>>> Hello - >>>>>> >>>>>> I am a beginner in Kafka .. with my first Kafka streams >>>>>> application .. >>>>>> >>>>>> I have a streams application that reads from a topic, >>>>>> does some transformation on the data and writes to >>>>>> another topic. The record that I manipulate is a CSV >>>>>> record. >>>>>> >>>>>> It runs fine when I run it on a local Kafka instance. >>>>>> >>>>>> However when I run it on an AWS cluster, I get the >>>>>> following exception when I try to produce the >>>>>> transformed record into the target topic. >>>>>> >>>>>> Exception in thread "StreamThread-1" >>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 >>>>>> at >>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init> >>>>>> (ProducerRecord.java:60) at >>>>>> org.apache.kafka.streams.processor.internals.SinkNode. >>>>>> process(SinkNode.java:72) at >>>>>> org.apache.kafka.streams.processor.internals. >>>>>> StreamTask.forward(StreamTask.java:338) at >>>>>> org.apache.kafka.streams.processor.internals. >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >>>>>> >>>>>> at >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ >>>>>> >>>>>> > >>>>>> KStreamMapProcessor.process(KStreamMapValues.java:42) at >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces s > >>>>>> ( >>>>>> >>>>>> > >>>>>> > ProcessorNode.java:68) >>>>>> at org.apache.kafka.streams.processor.internals. >>>>>> StreamTask.forward(StreamTask.java:338) at >>>>>> org.apache.kafka.streams.processor.internals. >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >>>>>> >>>>>> at >>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$ >>>>>> >>>>>> > >>>>>> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34) >>>>>> at >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces s > >>>>>> ( >>>>>> >>>>>> > >>>>>> > ProcessorNode.java:68) >>>>>> at org.apache.kafka.streams.processor.internals. >>>>>> StreamTask.forward(StreamTask.java:351) at >>>>>> org.apache.kafka.streams.processor.internals. >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192) >>>>>> >>>>>> at >>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$ >>>>>> KStreamBranchProcessor.process(KStreamBranch.java:46) at >>>>>> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces s > >>>>>> ( >>>>>> >>>>>> > >>>>>> > ProcessorNode.java:68) >>>>>> at org.apache.kafka.streams.processor.internals. >>>>>> StreamTask.forward(StreamTask.java:338) at >>>>>> org.apache.kafka.streams.processor.internals. >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >>>>>> >>>>>> at >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ >>>>>> >>>>>> > >>>>>> KStreamMapProcessor.process(KStreamMapValues.java:42) at >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces s > >>>>>> ( >>>>>> >>>>>> > >>>>>> > ProcessorNode.java:68) >>>>>> at org.apache.kafka.streams.processor.internals. >>>>>> StreamTask.forward(StreamTask.java:338) at >>>>>> org.apache.kafka.streams.processor.internals. >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >>>>>> >>>>>> at org.apache.kafka.streams.processor.internals. >>>>>> SourceNode.process(SourceNode.java:64) at >>>>>> org.apache.kafka.streams.processor.internals. >>>>>> StreamTask.process(StreamTask.java:174) at >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop ( >>>>>> >>>>>> > >>>>>> >>>>>> > StreamThread.java:320) >>>>>> at org.apache.kafka.streams.processor.internals. >>>>>> StreamThread.run(StreamThread.java:218) >>>>>> >>>>>> Looks like the timestamp passed to the ProducerRecord is >>>>>> -1, though I am not passing any timestamp explicitly. I >>>>>> am not sure why this happens. But I see that the Javadoc >>>>>> for ProducerRecord says the following .. >>>>>> >>>>>> The record also has an associated timestamp. If the user >>>>>> did not provide a >>>>>>> timestamp, the producer will stamp the record with its >>>>>>> current time. The timestamp eventually used by Kafka >>>>>>> depends on the timestamp type configured for the >>>>>>> topic. If the topic is configured to use CreateTime, >>>>>>> the timestamp in the producer record will be used by >>>>>>> the broker. If the topic is configured to use >>>>>>> LogAppendTime, the timestamp in the producer record >>>>>>> will be overwritten by the broker with the broker local >>>>>>> time when it appends the message to its log. In either >>>>>>> of the cases above, the timestamp that has actually >>>>>>> been used will be returned to user in RecordMetadata >>>>>> >>>>>> >>>>>> 1. Will this problem be solved if I configure the topic >>>>>> with LogAppendTime or CreateTime explicitly ? 2. What is >>>>>> the default setting of this property in a newly created >>>>>> topic ? 3. How do I change it (what is the name of the >>>>>> property to be set) ? 4. Any idea why I face this >>>>>> problem in the cluster mode but not in the local mode ? >>>>>> >>>>>> BTW I am using 0.10.1. >>>>>> >>>>>> Any help / pointer will be appreciated ? >>>>>> >>>>>> regards. >>>>>> >>>>>> -- Debasish Ghosh http://manning.com/ghosh2 >>>>>> http://manning.com/ghosh >>>>>> >>>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com >>>>>> Code: http://github.com/debasishg >>>>>> >>>>> >>>>> >>>>> >>> > > > > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYFEcMAAoJECnhiMLycopPPrwQAJyWn+InO+JcrDnNaSkfEt3n 6sp5rjINdTEA1PIorEDDQcwaq8gB/DwTQOKsBUDnukLc4VI/HPzpWRaBGVJkw+ki tm1UpGG4LBlvQ/E4S3a+c15X03IgNQ3htLwipuC0qqtpYmo2OB2+035Ewch1RlRl E3mL1v14CEsvf/a+If3w+wkS3CoSey6SlWBk//Z0OCd7yy68DxO94JpxnP0M7vNe zICCnxqSHTFjNMipQP/uX0hT2HM0J1q4HeWCKcVB6VQgpu97gypQT25L5iatOv41 mFXVFKrYllvlYgLXq5PakI47H1DnkZNlN8maiKLC+7nrzqy0VTQhdxPLg6mVqVPX MrkJ2jzrvI58F37Ac8vRFvgBJo5XVgaocY71rLmrVn3WA4oUpJRGB5fZe5vqJbDn xAPjgRU2BA3l8nekG5iQ1O5osAhkT4PNzA/WTV2FGoNUu/zNupfe0Qipnsm8rqIM RNTlCzDQU2X3dqUTm+Ze5Sn6WTjyiu9HPhYXrCgncAMFHMVH/4Tq53aJoiC7cz72 IMXrQr7oU8hkgCzDMQ+kncHnquj23xDt7lsUyD8AJ6hfOcDLKQ3XyXo72bjnpGYt 21qBP3JqABkeHYrSFuR3BCL/VJ0JSGgjBVkKjXwZOZ+3lDAuHRd/5ZR5AeoveHwO rA3fRxGlqR7RWyElKC51 =zBM7 -----END PGP SIGNATURE-----