Hi, the WallclockTimestampExtractor is only applied to source topics (this is a correctness fix included in 1.0 release: https://issues.apache.org/jira/browse/KAFKA-4785). When writing records into the repartition topics they get the timestamp for the input record (ie, whatever WallclodkTimestampExtractor returned) and thus, the written timestamp is not negative and FailOnInvalidTimestamp can pick up the timestamp.
One explanation of negative timestamps for repartitioning topics would be, that the on-disk message format of the topic was not upgraded to 0.10 format -- this is a requirement to store timestamps. Thus, even if a positive timestamp is included in the records that are written into the repartition topics, the timestamp are not stored. When the records are read back, the broker just puts a `-1` into the record. Thus, you should upgrade the message format to 0.10+ to allow Kafka Streams to "internally forward" timestamps. If you cannot upgrade the message format, one workaround would be to write to repartitions topics explicitly via `to()` and read them back explicitly via `.stream()`. For this case, the topics are treated as source topics and WallclockTimestampExtractor will be applied. -Matthias On 3/22/18 7:28 AM, Shelan Perera wrote: > Hi, > > We are trying to run an Kafka streams applications against a Kafka cluster > and some of the incoming messages have negative timestamp as some of the > producers are using older version of the Kafka. > > Therefore we used WallclockTimeStampExtractor to patch those timestamps. > But also read in the documentation that this will not have any effect for > the internally created topics for the streams. (repartitioned topics.) > > It is strange that internal records have negative timestamps as we are > using latest kafka versions for kafka stream application. > > Error message we get > ====================== > > org.apache.kafka.streams.processor.FailOnInvalidTimestamp - Input record > ConsumerRecord(topic = my-app-KSTREAM-TRANSFORM-0000000009-repartition, > partition = 6, offset = 18, CreateTime = -1, serialized key size = 8, > serialized value size = 104, headers = RecordHeaders(headers = [], > isReadOnly = false), key = 13143752, value = <our message>) has invalid > (negative) timestamp. Possibly because a pre-0.10 producer client was used > to write this record to Kafka without embedding a timestamp, or because the > input topic was created before upgrading the Kafka cluster to 0.10+. Use a > different TimestampExtractor to process this data. > > Properties in the code > ======================= > > kafkaStreamsProperties.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,WallclockTimestampExtractor.class); > kafkaStreamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > kafkaStreamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > kafkaStreamsProperties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class); > > Kafka cluster version : kafka_2.12-0.10.2.1 > Kafka version in streams application : 1.0.0 > > We have same streams application running on two instances consuming the > messages from a Kafka cluster. > > What could be the reason stream application fails on negative timestamps > while we have used wallClockTimeStamp extractor ? > > Thank you, > > Best Regards, > Shelan Perera >
signature.asc
Description: OpenPGP digital signature