-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Btw: I would highly recommend to use Kafka 0.10.1 -- there are many
new Streams feature and usability improvements and bug fixes.

- -Matthias

On 10/28/16 11:42 PM, Matthias J. Sax wrote:
> That sounds reasonable. However, I am wondering how your Streams 
> application can connect to 0.9 broker in the first place. Streams 
> internally uses standard Kafka clients, and those are not backward 
> compatible. Thus, the 0.10 Streams clients should not be able to 
> connect to 0.9 broker.
> 
> In case you do have 0.10 brokers, it might however happen, that 
> bin/kafka-console-producer.sh does use 0.9 producer. Broker are 
> backward compatible, thus, a 0.9 producer can write to 0.10 broker 
> (and in this case record TS would be invalid). While I assume that
> in you local environment you are using 0.10
> bin/kafka-console-produer.sh and thus all works fine.
> 
> 
> -Matthias
> 
> 
> On 10/28/16 11:00 PM, Debasish Ghosh wrote:
>> Hello Mathias -
> 
>> Thanks a lot for the response. I think what may be happening is
>> a version mismatch between the development & deployment versions
>> of Kafka. The Kafka streams application that I developed uses
>> 0.10.0 based libraries. And my local environment contains a
>> server installation of the same version. Hence it works ok in my
>> local environment.
> 
>> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the
>> service through DC/OS cli. And I use this version to load records
>> into the input topic. And try to consume using the deployed
>> streams application which I developed using 0.10.0. Hence the
>> producer did not put the timestamp while the consumer expects to
>> have one.
> 
>> I need to check if 0.10.x is available for DC/OS ..
> 
>> Thanks again for your suggestions.
> 
> 
>> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax 
>> <matth...@confluent.io> wrote:
> 
>> Hey,
> 
>> we just added a new FAQ entry for upcoming CP 3.2 release that 
>> answers your question. I just c&p it here. More concrete answer 
>> below.
> 
>>>>> If you get an exception similar to the one shown below,
>>>>> there are multiple possible causes:
>>>>> 
>>>>> Exception in thread "StreamThread-1" 
>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at
>>>>>  
>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRe
c
>
>>>>> 
ord
> 
>>>>> 
> .java:60)
>>>>> 
>>>>> This error means that the timestamp extractor of your Kafka
>>>>>  Streams application failed to extract a valid timestamp
>>>>> from a record. Typically, this points to a problem with the
>>>>> record (e.g., the record does not contain a timestamp at
>>>>> all), but it could also indicate a problem or bug in the
>>>>> timestamp extractor used by the application.
>>>>> 
>>>>> When does a record not contain a valid timestamp:
>>>>> 
>>>>> If you are using the default 
>>>>> ConsumerRecordTimestampExtractor, it is most likely that
>>>>> your records do not carry an embedded timestamp (embedded
>>>>> record timestamps got introduced in Kafka's message format
>>>>> in Kafka 0.10). This might happen, if you consume a topic
>>>>> that is written by old Kafka producer clients (ie, version
>>>>> 0.9 or earlier) or third party producer clients. A common
>>>>> situation where this may happen is after upgrading your
>>>>> Kafka cluster from 0.9 to 0.10, where all the data that was
>>>>> generated with 0.9 is not compatible with the 0.10 message
>>>>> format. If you are using a custom timestamp extractor, make
>>>>> sure that your extractor is robust to missing timestamps in
>>>>> your records. For example, you can return a default or
>>>>> estimated timestamp if you cannot extract a valid timestamp
>>>>> (maybe the timstamp field in your data is just missing).
>>>>> You can also switch to processing time semantics via
>>>>> WallclockTimestampExtractor; whether such a fallback is an
>>>>> appropriate response to this situation depends on your use
>>>>> case. However, as a first step you should identify and fix
>>>>> the root cause for why such problematic records were
>>>>> written to Kafka in the first place. In a second step you
>>>>> may consider applying workarounds (as described above) when
>>>>> dealing with such records (for example, if you need to
>>>>> process those records after all). Another option is to
>>>>> regenerate the records with correct timestamps and write
>>>>> them to a new Kafka topic.
>>>>> 
>>>>> When the timestamp extractor causes the problem:
>>>>> 
>>>>> In this situation you should debug and fix the erroneous 
>>>>> extractor. If the extractor is built into Kafka, please 
>>>>> report the bug to the Kafka developer mailing list at 
>>>>> d...@kafka.apache.org (see instructions 
>>>>> http://kafka.apache.org/contact); in the meantime, you may 
>>>>> write a custom timestamp extractor that fixes the problem 
>>>>> and configure your application to use that extractor for
>>>>> the time being.
> 
> 
>> To address you questions more concretely:
> 
>> 1. Yes an no: Yes, for any new data you write to you topic. No, 
>> for any already written data that does not have a valid
>> timestamp set 2. Default is creating time 3. Config parameter 
>> "message.timestamp.type") It's a broker side per topic setting 
>> (however, be aware that Java KafkaProducer does verify the 
>> timestamp locally before sending the message to the broker, thus
>> on -1 there will be the client side exception you did observe( 4.
>> I assume that you do consumer different topic with different TS 
>> fields in you records.
> 
>> Also have a look at: 
>> http://docs.confluent.io/current/streams/concepts.html#time
> 
> 
> 
>> -Matthias
> 
> 
>> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
>>>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned
>>>>> in the last mail. And I am using Kafka within a DC/OS
>>>>> cluster under AWS.
>>>>> 
>>>>> The version that I mentioned works ok is on my local
>>>>> machine using a local Kafka installation. And it works for
>>>>> both single broker and multi broker scenario.
>>>>> 
>>>>> Thanks.
>>>>> 
>>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh 
>>>>> <ghosh.debas...@gmail.com> wrote:
>>>>> 
>>>>>> Hello -
>>>>>> 
>>>>>> I am a beginner in Kafka .. with my first Kafka streams 
>>>>>> application ..
>>>>>> 
>>>>>> I have a streams application that reads from a topic,
>>>>>> does some transformation on the data and writes to
>>>>>> another topic. The record that I manipulate is a CSV
>>>>>> record.
>>>>>> 
>>>>>> It runs fine when I run it on a local Kafka instance.
>>>>>> 
>>>>>> However when I run it on an AWS cluster, I get the 
>>>>>> following exception when I try to produce the
>>>>>> transformed record into the target topic.
>>>>>> 
>>>>>> Exception in thread "StreamThread-1" 
>>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 
>>>>>> at
>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init> 
>>>>>> (ProducerRecord.java:60) at 
>>>>>> org.apache.kafka.streams.processor.internals.SinkNode. 
>>>>>> process(SinkNode.java:72) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>
>>>>>> 
at
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>>>>>>
>>>>>>
>
>>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
s
>
>>>>>> 
(
>>>>>> 
>>>>>> 
> 
>>>>>> 
> ProcessorNode.java:68)
>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>
>>>>>> 
at
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
>>>>>>
>>>>>>
>
>>>>>> 
KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
>>>>>> at 
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
s
>
>>>>>> 
(
>>>>>> 
>>>>>> 
> 
>>>>>> 
> ProcessorNode.java:68)
>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>> StreamTask.forward(StreamTask.java:351) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
>>>>>>
>>>>>> 
at
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
>>>>>>  KStreamBranchProcessor.process(KStreamBranch.java:46) at
>>>>>>  
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
s
>
>>>>>> 
(
>>>>>> 
>>>>>> 
> 
>>>>>> 
> ProcessorNode.java:68)
>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>
>>>>>> 
at
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>>>>>>
>>>>>>
>
>>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
s
>
>>>>>> 
(
>>>>>> 
>>>>>> 
> 
>>>>>> 
> ProcessorNode.java:68)
>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>
>>>>>> 
at org.apache.kafka.streams.processor.internals.
>>>>>> SourceNode.process(SourceNode.java:64) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> StreamTask.process(StreamTask.java:174) at 
>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop
(
>>>>>>
>>>>>>
>
>>>>>> 
>>>>>> 
> StreamThread.java:320)
>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>> StreamThread.run(StreamThread.java:218)
>>>>>> 
>>>>>> Looks like the timestamp passed to the ProducerRecord is 
>>>>>> -1, though I am not passing any timestamp explicitly. I
>>>>>> am not sure why this happens. But I see that the Javadoc
>>>>>> for ProducerRecord says the following ..
>>>>>> 
>>>>>> The record also has an associated timestamp. If the user 
>>>>>> did not provide a
>>>>>>> timestamp, the producer will stamp the record with its 
>>>>>>> current time. The timestamp eventually used by Kafka 
>>>>>>> depends on the timestamp type configured for the
>>>>>>> topic. If the topic is configured to use CreateTime,
>>>>>>> the timestamp in the producer record will be used by
>>>>>>> the broker. If the topic is configured to use
>>>>>>> LogAppendTime, the timestamp in the producer record
>>>>>>> will be overwritten by the broker with the broker local
>>>>>>> time when it appends the message to its log. In either
>>>>>>> of the cases above, the timestamp that has actually
>>>>>>> been used will be returned to user in RecordMetadata
>>>>>> 
>>>>>> 
>>>>>> 1. Will this problem be solved if I configure the topic 
>>>>>> with LogAppendTime or CreateTime explicitly ? 2. What is 
>>>>>> the default setting of this property in a newly created 
>>>>>> topic ? 3. How do I change it (what is the name of the 
>>>>>> property to be set) ? 4. Any idea why I face this
>>>>>> problem in the cluster mode but not in the local mode ?
>>>>>> 
>>>>>> BTW I am using 0.10.1.
>>>>>> 
>>>>>> Any help / pointer will be appreciated ?
>>>>>> 
>>>>>> regards.
>>>>>> 
>>>>>> -- Debasish Ghosh http://manning.com/ghosh2 
>>>>>> http://manning.com/ghosh
>>>>>> 
>>>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com 
>>>>>> Code: http://github.com/debasishg
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>> 
> 
> 
> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYFEcMAAoJECnhiMLycopPPrwQAJyWn+InO+JcrDnNaSkfEt3n
6sp5rjINdTEA1PIorEDDQcwaq8gB/DwTQOKsBUDnukLc4VI/HPzpWRaBGVJkw+ki
tm1UpGG4LBlvQ/E4S3a+c15X03IgNQ3htLwipuC0qqtpYmo2OB2+035Ewch1RlRl
E3mL1v14CEsvf/a+If3w+wkS3CoSey6SlWBk//Z0OCd7yy68DxO94JpxnP0M7vNe
zICCnxqSHTFjNMipQP/uX0hT2HM0J1q4HeWCKcVB6VQgpu97gypQT25L5iatOv41
mFXVFKrYllvlYgLXq5PakI47H1DnkZNlN8maiKLC+7nrzqy0VTQhdxPLg6mVqVPX
MrkJ2jzrvI58F37Ac8vRFvgBJo5XVgaocY71rLmrVn3WA4oUpJRGB5fZe5vqJbDn
xAPjgRU2BA3l8nekG5iQ1O5osAhkT4PNzA/WTV2FGoNUu/zNupfe0Qipnsm8rqIM
RNTlCzDQU2X3dqUTm+Ze5Sn6WTjyiu9HPhYXrCgncAMFHMVH/4Tq53aJoiC7cz72
IMXrQr7oU8hkgCzDMQ+kncHnquj23xDt7lsUyD8AJ6hfOcDLKQ3XyXo72bjnpGYt
21qBP3JqABkeHYrSFuR3BCL/VJ0JSGgjBVkKjXwZOZ+3lDAuHRd/5ZR5AeoveHwO
rA3fRxGlqR7RWyElKC51
=zBM7
-----END PGP SIGNATURE-----

Reply via email to