Hi Mattias,

Thank you very much for the explanation it really helps. We found out that
one of the brokers were using older message formats and that caused the
problem.

Best Regards,
Shelan.

On 22 March 2018 at 23:28, Matthias J. Sax <matth...@confluent.io> wrote:

> Hi,
>
> the WallclockTimestampExtractor is only applied to source topics (this
> is a correctness fix included in 1.0 release:
> https://issues.apache.org/jira/browse/KAFKA-4785). When writing records
> into the repartition topics they get the timestamp for the input record
> (ie, whatever WallclodkTimestampExtractor returned) and thus, the
> written timestamp is not negative and FailOnInvalidTimestamp can pick up
> the timestamp.
>
> One explanation of negative timestamps for repartitioning topics would
> be, that the on-disk message format of the topic was not upgraded to
> 0.10 format -- this is a requirement to store timestamps. Thus, even if
> a positive timestamp is included in the records that are written into
> the repartition topics, the timestamp are not stored. When the records
> are read back, the broker just puts a `-1` into the record.
>
> Thus, you should upgrade the message format to 0.10+ to allow Kafka
> Streams to "internally forward" timestamps.
>
> If you cannot upgrade the message format, one workaround would be to
> write to repartitions topics explicitly via `to()` and read them back
> explicitly via `.stream()`. For this case, the topics are treated as
> source topics and WallclockTimestampExtractor will be applied.
>
>
> -Matthias
>
>
> On 3/22/18 7:28 AM, Shelan Perera wrote:
> > Hi,
> >
> > We are trying to run an Kafka streams applications against a Kafka
> cluster
> > and some of the incoming messages have negative timestamp as some of the
> > producers are using older version of the Kafka.
> >
> > Therefore we used WallclockTimeStampExtractor to patch those timestamps.
> > But also read in the documentation that this will not have any effect for
> > the internally created topics for the streams. (repartitioned topics.)
> >
> > It is strange that internal records have negative timestamps as we are
> > using latest kafka versions for kafka stream application.
> >
> > Error message we get
> > ======================
> >
> > org.apache.kafka.streams.processor.FailOnInvalidTimestamp - Input record
> > ConsumerRecord(topic = my-app-KSTREAM-TRANSFORM-0000000009-repartition,
> > partition = 6, offset = 18, CreateTime = -1, serialized key size = 8,
> > serialized value size = 104, headers = RecordHeaders(headers = [],
> > isReadOnly = false), key = 13143752, value = <our message>) has invalid
> > (negative) timestamp. Possibly because a pre-0.10 producer client was
> used
> > to write this record to Kafka without embedding a timestamp, or because
> the
> > input topic was created before upgrading the Kafka cluster to 0.10+. Use
> a
> > different TimestampExtractor to process this data.
> >
> > Properties in the code
> > =======================
> >
> > kafkaStreamsProperties.put(StreamsConfig.DEFAULT_
> TIMESTAMP_EXTRACTOR_CLASS_CONFIG,WallclockTimestampExtractor.class);
> > kafkaStreamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> > kafkaStreamsProperties.put(StreamsConfig.DEFAULT_VALUE_
> SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> > kafkaStreamsProperties.put(StreamsConfig.DEFAULT_
> DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
> > LogAndContinueExceptionHandler.class);
> >
> > Kafka cluster version : kafka_2.12-0.10.2.1
> > Kafka version in streams application : 1.0.0
> >
> > We have same streams application running on two instances consuming the
> > messages from a Kafka cluster.
> >
> > What could be the reason stream application fails on negative timestamps
> > while we have used wallClockTimeStamp extractor ?
> >
> > Thank you,
> >
> > Best Regards,
> > Shelan Perera
> >
>
>

Reply via email to