Hi,

While adding a new Streams based micro service to an existing Kafka 
infrastructure, I have run into some issues processing older data in existing 
topics. I am uncertain of the exact cause of the problems, but am looking for 
advice to clarify how things are supposed to work to eliminate possibilities. 
The following is my hypothesis of what may be happening; maybe somebody can 
tell me why what I describe is impossible - or whether this might be a bug. The 
following relates to Kafka 0.10.2.1 (confluent distribution).



TL;DR: When (re)processing old data with Kafka Streams in a topology that 
causes the stream to be repartitioned, the records in the -repartition topic 
carry the timestamp of the original input records as extracted by the timestamp 
extractor. The default retention policy of the -repartition topic is however 7 
days, allowing Kafka to delete data from the -repartition topic even before the 
Streams application has a chance of reading it back in.





The situation is basically that I have a topic with existing records dating 
back several months. Each record contains a timestamp and a client identifier 
(among other things). The task is quite simple: Produce an output topic that 
contains the largest timestamp for each client. Distilled this looks something 
like this:

   class Record {
     ...
     long client;
     long time;
     ...
   }

   builder.stream(EARLIEST, Bytes(), Record(), “input")
          .map((hash, record) -> new KeyValue<>(record.client, record.time))
          .groupByKey(Long(), Long())
          .reduce(Long::max, “store")
          .to(Long(), Long(), “output”);

where Bytes(), Long(), Record() return the appropriate Serde. This takes each 
input record and throws the original key away, repartitions on the embedded id, 
runs a reduction operation keeping the largest timestamp, and stores the result 
back into a topic. The repartitioning causes an internal topic to be created. 
This topic will have a cleanup.policy=delete and the server default retention 
policy of 7 days. 


I am using the default FailOnInvalidTimestamp timestamp extractor. As far as I 
can determine this causes the record in the -repartition topic to have the same 
metadata timestamps as the input records. Also, as far as I can see in the 
Kafka server side code, log segments will be deleted once the largest timestamp 
(as extracted from the records stored in the segment) is older than the 
retention policy. 


This is where I wonder how this is supposed to work when ingesting months old 
data: it would appear that Kafka could start to delete segments of the 
-repartition aggressively as the timestamps are several months old. This could 
happen even before Kafka Streams had a chance to read the data back in for the 
reduce operation. The Kafka server log would seem to support that this happens 
as I see several segments be created *and deleted* right after the application 
was started:

   [2017-07-24 09:37:54,735] INFO Rolled new log segment for 
‘xxx-repartition-2' in 1 ms. (kafka.log.Log)
   [2017-07-24 09:37:54,735] INFO Scheduling log segment 0 for log 
xxx-repartition-2 for deletion. (kafka.log.Log)


This repeats quite a number of times for the first half our or so - presumably 
until the computation has caught up with newer data that didn’t get deleted 
right away. Also the client side log seemed to indicate that something like 
this was happening:


   2017-07-24 09:45:02,550  INFO StreamThread stream-thread [StreamThread-1] no 
custom setting defined for topic xxx-repartition using original config earliest 
for offset reset


This message too repeated quite a number of times for the first half our or so. 
Looking at the Kafka Streams code, this message would get logged as a result of 
the consumer failing due to an invalid offset.


Assuming my theory is correct, I could probably solve this problem by using a 
WallclockTimestampExtractor (something I will test tomorrow). However one of 
the use cases often repeated in the Kafka material is that one can reprocess 
old data - surely it must be possible to reprocess using a timestamp extractor 
that reflects the original time, not the current processing time?


I tried to Google for information about retention time and internal topics. The 
closest thing I could come is that Kafka 0.11 has gained supported for 
applications asking for records before a particular offset to be deleted; a 
future Kafka Streams could use this to eliminate the need for having a 
retention time on the internal topic and thus resolve the problem.



Cheers,

Gerd

Reply via email to