Hi,

the WallclockTimestampExtractor is only applied to source topics (this
is a correctness fix included in 1.0 release:
https://issues.apache.org/jira/browse/KAFKA-4785). When writing records
into the repartition topics they get the timestamp for the input record
(ie, whatever WallclodkTimestampExtractor returned) and thus, the
written timestamp is not negative and FailOnInvalidTimestamp can pick up
the timestamp.

One explanation of negative timestamps for repartitioning topics would
be, that the on-disk message format of the topic was not upgraded to
0.10 format -- this is a requirement to store timestamps. Thus, even if
a positive timestamp is included in the records that are written into
the repartition topics, the timestamp are not stored. When the records
are read back, the broker just puts a `-1` into the record.

Thus, you should upgrade the message format to 0.10+ to allow Kafka
Streams to "internally forward" timestamps.

If you cannot upgrade the message format, one workaround would be to
write to repartitions topics explicitly via `to()` and read them back
explicitly via `.stream()`. For this case, the topics are treated as
source topics and WallclockTimestampExtractor will be applied.


-Matthias


On 3/22/18 7:28 AM, Shelan Perera wrote:
> Hi,
> 
> We are trying to run an Kafka streams applications against a Kafka cluster
> and some of the incoming messages have negative timestamp as some of the
> producers are using older version of the Kafka.
> 
> Therefore we used WallclockTimeStampExtractor to patch those timestamps.
> But also read in the documentation that this will not have any effect for
> the internally created topics for the streams. (repartitioned topics.)
> 
> It is strange that internal records have negative timestamps as we are
> using latest kafka versions for kafka stream application.
> 
> Error message we get
> ======================
> 
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp - Input record
> ConsumerRecord(topic = my-app-KSTREAM-TRANSFORM-0000000009-repartition,
> partition = 6, offset = 18, CreateTime = -1, serialized key size = 8,
> serialized value size = 104, headers = RecordHeaders(headers = [],
> isReadOnly = false), key = 13143752, value = <our message>) has invalid
> (negative) timestamp. Possibly because a pre-0.10 producer client was used
> to write this record to Kafka without embedding a timestamp, or because the
> input topic was created before upgrading the Kafka cluster to 0.10+. Use a
> different TimestampExtractor to process this data.
> 
> Properties in the code
> =======================
> 
> kafkaStreamsProperties.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,WallclockTimestampExtractor.class);
> kafkaStreamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
> kafkaStreamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
> kafkaStreamsProperties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
> LogAndContinueExceptionHandler.class);
> 
> Kafka cluster version : kafka_2.12-0.10.2.1
> Kafka version in streams application : 1.0.0
> 
> We have same streams application running on two instances consuming the
> messages from a Kafka cluster.
> 
> What could be the reason stream application fails on negative timestamps
> while we have used wallClockTimeStamp extractor ?
> 
> Thank you,
> 
> Best Regards,
> Shelan Perera
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to