Hi Frank,

yes, retention policy is based on the embedded record timestamps and not
on system time. Thus, if you send messages with an old timestamp, they
can trigger log/segment rolling.

>> I see that the repartition topics have timestamp.type = CreateTime, does
>> that mean it uses the timestamp of the
>> original message? 

Yes. That's the default setting on the broker side. For Streams, we
maintain a so-called "stream time" that is computed based on the input
record timestamps. This "stream time" is used to set the timestamp for
records that are written by Stream. (so it's more or less the timestamp
of the input records).

>> Shouldn't that be LogAppendTime for repartition topics?

No. Streams needs to preserve the original timestamp to guaranteed
correct semantics for downstream window operations. Thus, it should be
CreateTime -- if you switch to LogAppendTime, you might break your
application logic and get wrong results.

>> Or is there a way to configure that?

You can configure this on a per topic basis on the brokers.

>> If I hack into my Kafka streams code to force it to use LogAppendTime seems
>> to solve my problem, but that appears to
>> take a huge toll on the brokers. Throughput plummets, and I don't really
>> know why.

I am not sure what you mean by this? As it's a topic config, I don't
understand how you can force this within you Streams application?


IMHO, you have multiple options thoug:
 - increase the retention time for you re-partitioning topics
 - you could change the retention policy to number of bytes instead of
time for the re-partitioning topics
 - you can implement a custom timestamp extractor and adjust the
timestamps accordingly ("stream time" is based on whatever timestamp
extractor return)

However, if you have records with old timestamps, I am wondering why
they are not truncated in your input topic? Do you not face the same
issue there?


-Matthias





On 6/2/17 9:33 AM, Frank Lyaruu wrote:
> Hi Kafka people,
> 
> I'm running an application that pushes database changes into a Kafka topic.
> I'm also running a Kafka streams application
> that listens to these topics, and groups them using the high level API, and
> inserts them to another database.
> 
> All topics are compacted, with the exception of the 'repartition topics',
> which are configured to be retained for 36 hours.
> 
> Note that the changes in the original kafka topics can be old (generally
> more than 36 hours), as they only change when
> the data changes.
> 
> When I start an instance of the Kafka Streams application, I see the
> repartition topics being deleted immediately,
> sometimes before they are processed, and it looks like the repartition
> messages use the same timestamp as the
> original message.
> 
> I see that the repartition topics have timestamp.type = CreateTime, does
> that mean it uses the timestamp of the
> original message? Shouldn't that be LogAppendTime for repartition topics?
> Or is there a way to configure that?
> 
> If I hack into my Kafka streams code to force it to use LogAppendTime seems
> to solve my problem, but that appears to
> take a huge toll on the brokers. Throughput plummets, and I don't really
> know why.
> 
> Any ideas?
> 
> Frank
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to