Hi,

the behavior you describe is by design. You should increase the
retention time of the re-partitioning topics manually to process old data.


-Matthias

On 7/25/17 7:17 AM, Gerd Behrmann wrote:
> Hi,
> 
> While adding a new Streams based micro service to an existing Kafka 
> infrastructure, I have run into some issues processing older data in existing 
> topics. I am uncertain of the exact cause of the problems, but am looking for 
> advice to clarify how things are supposed to work to eliminate possibilities. 
> The following is my hypothesis of what may be happening; maybe somebody can 
> tell me why what I describe is impossible - or whether this might be a bug. 
> The following relates to Kafka 0.10.2.1 (confluent distribution).
> 
> 
> 
> TL;DR: When (re)processing old data with Kafka Streams in a topology that 
> causes the stream to be repartitioned, the records in the -repartition topic 
> carry the timestamp of the original input records as extracted by the 
> timestamp extractor. The default retention policy of the -repartition topic 
> is however 7 days, allowing Kafka to delete data from the -repartition topic 
> even before the Streams application has a chance of reading it back in.
> 
> 
> 
> 
> 
> The situation is basically that I have a topic with existing records dating 
> back several months. Each record contains a timestamp and a client identifier 
> (among other things). The task is quite simple: Produce an output topic that 
> contains the largest timestamp for each client. Distilled this looks 
> something like this:
> 
>    class Record {
>      ...
>      long client;
>      long time;
>      ...
>    }
> 
>    builder.stream(EARLIEST, Bytes(), Record(), “input")
>           .map((hash, record) -> new KeyValue<>(record.client, record.time))
>           .groupByKey(Long(), Long())
>           .reduce(Long::max, “store")
>           .to(Long(), Long(), “output”);
> 
> where Bytes(), Long(), Record() return the appropriate Serde. This takes each 
> input record and throws the original key away, repartitions on the embedded 
> id, runs a reduction operation keeping the largest timestamp, and stores the 
> result back into a topic. The repartitioning causes an internal topic to be 
> created. This topic will have a cleanup.policy=delete and the server default 
> retention policy of 7 days. 
> 
> 
> I am using the default FailOnInvalidTimestamp timestamp extractor. As far as 
> I can determine this causes the record in the -repartition topic to have the 
> same metadata timestamps as the input records. Also, as far as I can see in 
> the Kafka server side code, log segments will be deleted once the largest 
> timestamp (as extracted from the records stored in the segment) is older than 
> the retention policy. 
> 
> 
> This is where I wonder how this is supposed to work when ingesting months old 
> data: it would appear that Kafka could start to delete segments of the 
> -repartition aggressively as the timestamps are several months old. This 
> could happen even before Kafka Streams had a chance to read the data back in 
> for the reduce operation. The Kafka server log would seem to support that 
> this happens as I see several segments be created *and deleted* right after 
> the application was started:
> 
>    [2017-07-24 09:37:54,735] INFO Rolled new log segment for 
> ‘xxx-repartition-2' in 1 ms. (kafka.log.Log)
>    [2017-07-24 09:37:54,735] INFO Scheduling log segment 0 for log 
> xxx-repartition-2 for deletion. (kafka.log.Log)
> 
> 
> This repeats quite a number of times for the first half our or so - 
> presumably until the computation has caught up with newer data that didn’t 
> get deleted right away. Also the client side log seemed to indicate that 
> something like this was happening:
> 
> 
>    2017-07-24 09:45:02,550  INFO StreamThread stream-thread [StreamThread-1] 
> no custom setting defined for topic xxx-repartition using original config 
> earliest for offset reset
> 
> 
> This message too repeated quite a number of times for the first half our or 
> so. Looking at the Kafka Streams code, this message would get logged as a 
> result of the consumer failing due to an invalid offset.
> 
> 
> Assuming my theory is correct, I could probably solve this problem by using a 
> WallclockTimestampExtractor (something I will test tomorrow). However one of 
> the use cases often repeated in the Kafka material is that one can reprocess 
> old data - surely it must be possible to reprocess using a timestamp 
> extractor that reflects the original time, not the current processing time?
> 
> 
> I tried to Google for information about retention time and internal topics. 
> The closest thing I could come is that Kafka 0.11 has gained supported for 
> applications asking for records before a particular offset to be deleted; a 
> future Kafka Streams could use this to eliminate the need for having a 
> retention time on the internal topic and thus resolve the problem.
> 
> 
> 
> Cheers,
> 
> Gerd
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to