I will check out all options that u mentioned. I am sure on my local it's
all 0.10.0, so no wonder it works correctly. In The cluster, I just checked
the version of Kafka that ships with DC/OS 1.8 (the version I get with dcos
package install kafka) is 0.9.0. Regarding ..
In case you do have 0.10 brokers, it might however happen, that
> does use 0.9 producer.

How can I check this ?


On Sat, Oct 29, 2016 at 12:12 PM, Matthias J. Sax <matth...@confluent.io>

> Hash: SHA512
> That sounds reasonable. However, I am wondering how your Streams
> application can connect to 0.9 broker in the first place. Streams
> internally uses standard Kafka clients, and those are not backward
> compatible. Thus, the 0.10 Streams clients should not be able to
> connect to 0.9 broker.
> In case you do have 0.10 brokers, it might however happen, that
> bin/kafka-console-producer.sh does use 0.9 producer. Broker are
> backward compatible, thus, a 0.9 producer can write to 0.10 broker
> (and in this case record TS would be invalid). While I assume that in
> you local environment you are using 0.10 bin/kafka-console-produer.sh
> and thus all works fine.
> - -Matthias
> On 10/28/16 11:00 PM, Debasish Ghosh wrote:
> > Hello Mathias -
> >
> > Thanks a lot for the response. I think what may be happening is a
> > version mismatch between the development & deployment versions of
> > Kafka. The Kafka streams application that I developed uses 0.10.0
> > based libraries. And my local environment contains a server
> > installation of the same version. Hence it works ok in my local
> > environment.
> >
> > But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the service
> > through DC/OS cli. And I use this version to load records into the
> > input topic. And try to consume using the deployed streams
> > application which I developed using 0.10.0. Hence the producer did
> > not put the timestamp while the consumer expects to have one.
> >
> > I need to check if 0.10.x is available for DC/OS ..
> >
> > Thanks again for your suggestions.
> >
> >
> > On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax
> > <matth...@confluent.io> wrote:
> >
> > Hey,
> >
> > we just added a new FAQ entry for upcoming CP 3.2 release that
> > answers your question. I just c&p it here. More concrete answer
> > below.
> >
> >>>> If you get an exception similar to the one shown below, there
> >>>> are multiple possible causes:
> >>>>
> >>>> Exception in thread "StreamThread-1"
> >>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at
> >>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRec
> ord
> >
> >>>>
> .java:60)
> >>>>
> >>>> This error means that the timestamp extractor of your Kafka
> >>>> Streams application failed to extract a valid timestamp from
> >>>> a record. Typically, this points to a problem with the record
> >>>> (e.g., the record does not contain a timestamp at all), but
> >>>> it could also indicate a problem or bug in the timestamp
> >>>> extractor used by the application.
> >>>>
> >>>> When does a record not contain a valid timestamp:
> >>>>
> >>>> If you are using the default
> >>>> ConsumerRecordTimestampExtractor, it is most likely that your
> >>>> records do not carry an embedded timestamp (embedded record
> >>>> timestamps got introduced in Kafka's message format in Kafka
> >>>> 0.10). This might happen, if you consume a topic that is
> >>>> written by old Kafka producer clients (ie, version 0.9 or
> >>>> earlier) or third party producer clients. A common situation
> >>>> where this may happen is after upgrading your Kafka cluster
> >>>> from 0.9 to 0.10, where all the data that was generated with
> >>>> 0.9 is not compatible with the 0.10 message format. If you
> >>>> are using a custom timestamp extractor, make sure that your
> >>>> extractor is robust to missing timestamps in your records.
> >>>> For example, you can return a default or estimated timestamp
> >>>> if you cannot extract a valid timestamp (maybe the timstamp
> >>>> field in your data is just missing). You can also switch to
> >>>> processing time semantics via WallclockTimestampExtractor;
> >>>> whether such a fallback is an appropriate response to this
> >>>> situation depends on your use case. However, as a first step
> >>>> you should identify and fix the root cause for why such
> >>>> problematic records were written to Kafka in the first place.
> >>>> In a second step you may consider applying workarounds (as
> >>>> described above) when dealing with such records (for example,
> >>>> if you need to process those records after all). Another
> >>>> option is to regenerate the records with correct timestamps
> >>>> and write them to a new Kafka topic.
> >>>>
> >>>> When the timestamp extractor causes the problem:
> >>>>
> >>>> In this situation you should debug and fix the erroneous
> >>>> extractor. If the extractor is built into Kafka, please
> >>>> report the bug to the Kafka developer mailing list at
> >>>> d...@kafka.apache.org (see instructions
> >>>> http://kafka.apache.org/contact); in the meantime, you may
> >>>> write a custom timestamp extractor that fixes the problem
> >>>> and configure your application to use that extractor for the
> >>>> time being.
> >
> >
> > To address you questions more concretely:
> >
> > 1. Yes an no: Yes, for any new data you write to you topic. No,
> > for any already written data that does not have a valid timestamp
> > set 2. Default is creating time 3. Config parameter
> > "message.timestamp.type") It's a broker side per topic setting
> > (however, be aware that Java KafkaProducer does verify the
> > timestamp locally before sending the message to the broker, thus on
> > -1 there will be the client side exception you did observe( 4. I
> > assume that you do consumer different topic with different TS
> > fields in you records.
> >
> > Also have a look at:
> > http://docs.confluent.io/current/streams/concepts.html#time
> >
> >
> >
> > -Matthias
> >
> >
> > On 10/28/16 5:42 AM, Debasish Ghosh wrote:
> >>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in
> >>>> the last mail. And I am using Kafka within a DC/OS cluster
> >>>> under AWS.
> >>>>
> >>>> The version that I mentioned works ok is on my local machine
> >>>> using a local Kafka installation. And it works for both
> >>>> single broker and multi broker scenario.
> >>>>
> >>>> Thanks.
> >>>>
> >>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
> >>>> <ghosh.debas...@gmail.com> wrote:
> >>>>
> >>>>> Hello -
> >>>>>
> >>>>> I am a beginner in Kafka .. with my first Kafka streams
> >>>>> application ..
> >>>>>
> >>>>> I have a streams application that reads from a topic, does
> >>>>> some transformation on the data and writes to another
> >>>>> topic. The record that I manipulate is a CSV record.
> >>>>>
> >>>>> It runs fine when I run it on a local Kafka instance.
> >>>>>
> >>>>> However when I run it on an AWS cluster, I get the
> >>>>> following exception when I try to produce the transformed
> >>>>> record into the target topic.
> >>>>>
> >>>>> Exception in thread "StreamThread-1"
> >>>>> java.lang.IllegalArgumentException: Invalid timestamp -1
> >>>>> at org.apache.kafka.clients.producer.ProducerRecord.<init>
> >>>>> (ProducerRecord.java:60) at
> >>>>> org.apache.kafka.streams.processor.internals.SinkNode.
> >>>>> process(SinkNode.java:72) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>> at
> >>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >>>>>
> >>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (
> >>>>>
> >>>>>
> >
> >>>>>
> ProcessorNode.java:68)
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>> at
> >>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
> >>>>>
> >>>>>
> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
> >>>>> at
> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (
> >>>>>
> >>>>>
> >
> >>>>>
> ProcessorNode.java:68)
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.forward(StreamTask.java:351) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
> >>>>> at
> >>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
> >>>>> KStreamBranchProcessor.process(KStreamBranch.java:46) at
> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (
> >>>>>
> >>>>>
> >
> >>>>>
> ProcessorNode.java:68)
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>> at
> >>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >>>>>
> >>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (
> >>>>>
> >>>>>
> >
> >>>>>
> ProcessorNode.java:68)
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> SourceNode.process(SourceNode.java:64) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.process(StreamTask.java:174) at
> >>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>>>>
> >>>>>
> >
> >>>>>
> StreamThread.java:320)
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.run(StreamThread.java:218)
> >>>>>
> >>>>> Looks like the timestamp passed to the ProducerRecord is
> >>>>> -1, though I am not passing any timestamp explicitly. I am
> >>>>> not sure why this happens. But I see that the Javadoc for
> >>>>> ProducerRecord says the following ..
> >>>>>
> >>>>> The record also has an associated timestamp. If the user
> >>>>> did not provide a
> >>>>>> timestamp, the producer will stamp the record with its
> >>>>>> current time. The timestamp eventually used by Kafka
> >>>>>> depends on the timestamp type configured for the topic.
> >>>>>> If the topic is configured to use CreateTime, the
> >>>>>> timestamp in the producer record will be used by the
> >>>>>> broker. If the topic is configured to use LogAppendTime,
> >>>>>> the timestamp in the producer record will be overwritten
> >>>>>> by the broker with the broker local time when it appends
> >>>>>> the message to its log. In either of the cases above, the
> >>>>>> timestamp that has actually been used will be returned
> >>>>>> to user in RecordMetadata
> >>>>>
> >>>>>
> >>>>> 1. Will this problem be solved if I configure the topic
> >>>>> with LogAppendTime or CreateTime explicitly ? 2. What is
> >>>>> the default setting of this property in a newly created
> >>>>> topic ? 3. How do I change it (what is the name of the
> >>>>> property to be set) ? 4. Any idea why I face this problem
> >>>>> in the cluster mode but not in the local mode ?
> >>>>>
> >>>>> BTW I am using 0.10.1.
> >>>>>
> >>>>> Any help / pointer will be appreciated ?
> >>>>>
> >>>>> regards.
> >>>>>
> >>>>> -- Debasish Ghosh http://manning.com/ghosh2
> >>>>> http://manning.com/ghosh
> >>>>>
> >>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com
> >>>>> Code: http://github.com/debasishg
> >>>>>
> >>>>
> >>>>
> >>>>
> >>
> >
> >
> >
> Comment: GPGTools - https://gpgtools.org
> mkewtvZSN2wgXBiqhouTYvqrU+//gPfIw6aONN8DW6Z5ufbzxRjclA1y9QAXaeQw
> v9tuYxxtPwkw2oMuR3T8o4WI6dEsJao//YAE6fGqx4giWzHvbGZQBqwbidGVFkHE
> K0BC+Rkuod1wJ3t8/2u/JG8YQtSV5b1t/kp6wDGS/yNzxHRhCkKr4nWdHD9y+CKJ
> ej9+CD0wSqTHB5QpMLbbgd1CgX/QeJ8jweqeD7lWrgOWoDZxqnitxHCyQ484kdyw
> ETIJWT02cAj760iZRN8l892rVtZNhpCgdY9uY4l8z9px93qS8P9kjlGDrm+jaPhu
> nIBZnM2OoWEeg5rp52fQSIg37YIuVktrkvwj5C2kH1eo5iAnmJps/cZonP5BVEDR
> T+yyYNPER6ltgpZrRugJggT8TerylA12K5ro3JuoDPxeRQIbAN8JGgqUZofiOzhq
> DY56XVYlYAKcgj+8MqbygI0Q8pORnm2YNmtZgMYWThNuz6j6Nf3MmU5YIAc0g9ge
> 5asNzW2nPxY1N6INGFn5ET4qq8AX9/75LJ+i6tAr8Ddqy0MPt8TFHwE0bCgdPCNo
> Ef99ln2esbPnLujz+PeNuAh1RBab0Hets16jYF2dBwmKElgRCFH1u2gOfMG3cpjl
> WCwK1XhEpIDdB0+pxzyD

Debasish Ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to