-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

That sounds reasonable. However, I am wondering how your Streams
application can connect to 0.9 broker in the first place. Streams
internally uses standard Kafka clients, and those are not backward
compatible. Thus, the 0.10 Streams clients should not be able to
connect to 0.9 broker.

In case you do have 0.10 brokers, it might however happen, that
bin/kafka-console-producer.sh does use 0.9 producer. Broker are
backward compatible, thus, a 0.9 producer can write to 0.10 broker
(and in this case record TS would be invalid). While I assume that in
you local environment you are using 0.10 bin/kafka-console-produer.sh
and thus all works fine.


- -Matthias


On 10/28/16 11:00 PM, Debasish Ghosh wrote:
> Hello Mathias -
> 
> Thanks a lot for the response. I think what may be happening is a
> version mismatch between the development & deployment versions of
> Kafka. The Kafka streams application that I developed uses 0.10.0
> based libraries. And my local environment contains a server
> installation of the same version. Hence it works ok in my local
> environment.
> 
> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the service
> through DC/OS cli. And I use this version to load records into the
> input topic. And try to consume using the deployed streams
> application which I developed using 0.10.0. Hence the producer did
> not put the timestamp while the consumer expects to have one.
> 
> I need to check if 0.10.x is available for DC/OS ..
> 
> Thanks again for your suggestions.
> 
> 
> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax
> <matth...@confluent.io> wrote:
> 
> Hey,
> 
> we just added a new FAQ entry for upcoming CP 3.2 release that
> answers your question. I just c&p it here. More concrete answer
> below.
> 
>>>> If you get an exception similar to the one shown below, there
>>>> are multiple possible causes:
>>>> 
>>>> Exception in thread "StreamThread-1" 
>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at 
>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRec
ord
>
>>>> 
.java:60)
>>>> 
>>>> This error means that the timestamp extractor of your Kafka 
>>>> Streams application failed to extract a valid timestamp from
>>>> a record. Typically, this points to a problem with the record
>>>> (e.g., the record does not contain a timestamp at all), but
>>>> it could also indicate a problem or bug in the timestamp
>>>> extractor used by the application.
>>>> 
>>>> When does a record not contain a valid timestamp:
>>>> 
>>>> If you are using the default
>>>> ConsumerRecordTimestampExtractor, it is most likely that your
>>>> records do not carry an embedded timestamp (embedded record
>>>> timestamps got introduced in Kafka's message format in Kafka
>>>> 0.10). This might happen, if you consume a topic that is
>>>> written by old Kafka producer clients (ie, version 0.9 or
>>>> earlier) or third party producer clients. A common situation 
>>>> where this may happen is after upgrading your Kafka cluster
>>>> from 0.9 to 0.10, where all the data that was generated with
>>>> 0.9 is not compatible with the 0.10 message format. If you
>>>> are using a custom timestamp extractor, make sure that your
>>>> extractor is robust to missing timestamps in your records.
>>>> For example, you can return a default or estimated timestamp
>>>> if you cannot extract a valid timestamp (maybe the timstamp
>>>> field in your data is just missing). You can also switch to
>>>> processing time semantics via WallclockTimestampExtractor;
>>>> whether such a fallback is an appropriate response to this
>>>> situation depends on your use case. However, as a first step
>>>> you should identify and fix the root cause for why such
>>>> problematic records were written to Kafka in the first place.
>>>> In a second step you may consider applying workarounds (as 
>>>> described above) when dealing with such records (for example,
>>>> if you need to process those records after all). Another
>>>> option is to regenerate the records with correct timestamps
>>>> and write them to a new Kafka topic.
>>>> 
>>>> When the timestamp extractor causes the problem:
>>>> 
>>>> In this situation you should debug and fix the erroneous
>>>> extractor. If the extractor is built into Kafka, please
>>>> report the bug to the Kafka developer mailing list at
>>>> d...@kafka.apache.org (see instructions
>>>> http://kafka.apache.org/contact); in the meantime, you may
>>>> write a custom timestamp extractor that fixes the problem
>>>> and configure your application to use that extractor for the
>>>> time being.
> 
> 
> To address you questions more concretely:
> 
> 1. Yes an no: Yes, for any new data you write to you topic. No,
> for any already written data that does not have a valid timestamp
> set 2. Default is creating time 3. Config parameter
> "message.timestamp.type") It's a broker side per topic setting 
> (however, be aware that Java KafkaProducer does verify the 
> timestamp locally before sending the message to the broker, thus on
> -1 there will be the client side exception you did observe( 4. I
> assume that you do consumer different topic with different TS 
> fields in you records.
> 
> Also have a look at: 
> http://docs.confluent.io/current/streams/concepts.html#time
> 
> 
> 
> -Matthias
> 
> 
> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
>>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in
>>>> the last mail. And I am using Kafka within a DC/OS cluster
>>>> under AWS.
>>>> 
>>>> The version that I mentioned works ok is on my local machine
>>>> using a local Kafka installation. And it works for both
>>>> single broker and multi broker scenario.
>>>> 
>>>> Thanks.
>>>> 
>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh 
>>>> <ghosh.debas...@gmail.com> wrote:
>>>> 
>>>>> Hello -
>>>>> 
>>>>> I am a beginner in Kafka .. with my first Kafka streams 
>>>>> application ..
>>>>> 
>>>>> I have a streams application that reads from a topic, does
>>>>> some transformation on the data and writes to another
>>>>> topic. The record that I manipulate is a CSV record.
>>>>> 
>>>>> It runs fine when I run it on a local Kafka instance.
>>>>> 
>>>>> However when I run it on an AWS cluster, I get the
>>>>> following exception when I try to produce the transformed
>>>>> record into the target topic.
>>>>> 
>>>>> Exception in thread "StreamThread-1" 
>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1
>>>>> at org.apache.kafka.clients.producer.ProducerRecord.<init> 
>>>>> (ProducerRecord.java:60) at 
>>>>> org.apache.kafka.streams.processor.internals.SinkNode. 
>>>>> process(SinkNode.java:72) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>> at 
>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>>>>>
>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(
>>>>>
>>>>>
>
>>>>> 
ProcessorNode.java:68)
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>> at 
>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
>>>>>
>>>>> 
KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
>>>>> at 
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(
>>>>>
>>>>>
>
>>>>> 
ProcessorNode.java:68)
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.forward(StreamTask.java:351) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
>>>>> at 
>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$ 
>>>>> KStreamBranchProcessor.process(KStreamBranch.java:46) at 
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(
>>>>>
>>>>>
>
>>>>> 
ProcessorNode.java:68)
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>> at 
>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>>>>>
>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(
>>>>>
>>>>>
>
>>>>> 
ProcessorNode.java:68)
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> SourceNode.process(SourceNode.java:64) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.process(StreamTask.java:174) at 
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>>
>>>>>
>
>>>>> 
StreamThread.java:320)
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> StreamThread.run(StreamThread.java:218)
>>>>> 
>>>>> Looks like the timestamp passed to the ProducerRecord is
>>>>> -1, though I am not passing any timestamp explicitly. I am
>>>>> not sure why this happens. But I see that the Javadoc for
>>>>> ProducerRecord says the following ..
>>>>> 
>>>>> The record also has an associated timestamp. If the user
>>>>> did not provide a
>>>>>> timestamp, the producer will stamp the record with its
>>>>>> current time. The timestamp eventually used by Kafka
>>>>>> depends on the timestamp type configured for the topic.
>>>>>> If the topic is configured to use CreateTime, the
>>>>>> timestamp in the producer record will be used by the
>>>>>> broker. If the topic is configured to use LogAppendTime,
>>>>>> the timestamp in the producer record will be overwritten
>>>>>> by the broker with the broker local time when it appends
>>>>>> the message to its log. In either of the cases above, the
>>>>>> timestamp that has actually been used will be returned
>>>>>> to user in RecordMetadata
>>>>> 
>>>>> 
>>>>> 1. Will this problem be solved if I configure the topic
>>>>> with LogAppendTime or CreateTime explicitly ? 2. What is
>>>>> the default setting of this property in a newly created
>>>>> topic ? 3. How do I change it (what is the name of the
>>>>> property to be set) ? 4. Any idea why I face this problem
>>>>> in the cluster mode but not in the local mode ?
>>>>> 
>>>>> BTW I am using 0.10.1.
>>>>> 
>>>>> Any help / pointer will be appreciated ?
>>>>> 
>>>>> regards.
>>>>> 
>>>>> -- Debasish Ghosh http://manning.com/ghosh2 
>>>>> http://manning.com/ghosh
>>>>> 
>>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>> 
>>>> 
>>>> 
>>>> 
>> 
> 
> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYFETkAAoJECnhiMLycopPY6EQAJhI9K5FnuAZ68ix/MDOb2d5
mkewtvZSN2wgXBiqhouTYvqrU+//gPfIw6aONN8DW6Z5ufbzxRjclA1y9QAXaeQw
v9tuYxxtPwkw2oMuR3T8o4WI6dEsJao//YAE6fGqx4giWzHvbGZQBqwbidGVFkHE
K0BC+Rkuod1wJ3t8/2u/JG8YQtSV5b1t/kp6wDGS/yNzxHRhCkKr4nWdHD9y+CKJ
ej9+CD0wSqTHB5QpMLbbgd1CgX/QeJ8jweqeD7lWrgOWoDZxqnitxHCyQ484kdyw
ETIJWT02cAj760iZRN8l892rVtZNhpCgdY9uY4l8z9px93qS8P9kjlGDrm+jaPhu
nIBZnM2OoWEeg5rp52fQSIg37YIuVktrkvwj5C2kH1eo5iAnmJps/cZonP5BVEDR
T+yyYNPER6ltgpZrRugJggT8TerylA12K5ro3JuoDPxeRQIbAN8JGgqUZofiOzhq
DY56XVYlYAKcgj+8MqbygI0Q8pORnm2YNmtZgMYWThNuz6j6Nf3MmU5YIAc0g9ge
5asNzW2nPxY1N6INGFn5ET4qq8AX9/75LJ+i6tAr8Ddqy0MPt8TFHwE0bCgdPCNo
Ef99ln2esbPnLujz+PeNuAh1RBab0Hets16jYF2dBwmKElgRCFH1u2gOfMG3cpjl
WCwK1XhEpIDdB0+pxzyD
=NSWH
-----END PGP SIGNATURE-----

Reply via email to