So, in our setup, to provide the historic data on the platform, we would
have to define all topics with a retention period of the business time we
want to keep the data. However, on the intermediate topics, we would only
require the data to be there as long as necessary to be able to process the
data.

Could we achieve this result by increasing the log.segment.delete.delay.ms
to e.g. 1d? Would this give us a timeframe of a day to process the data on
the intermediary topics? Or is this just wishful thinking?

Thanks again!
-wim

On Fri, 15 Dec 2017 at 14:23 Wim Van Leuven <wim.vanleu...@highestpoint.biz>
wrote:

> Is it really? I checked some records on kafka topics using commandline
> consumers to print key and timestamps and timestamps was logged as
> CreateTime:1513332523181
>
> But that would explain the issue. I'll adjust the retention on the topic
> and rerun.
>
> Thank you already for the insights!
> -wim
>
> On Fri, 15 Dec 2017 at 14:08 Damian Guy <damian....@gmail.com> wrote:
>
>> Hi,
>>
>> It is likely due to the timestamps you are extracting and using as the
>> record timestamp. Kafka uses the record timestamps for retention. I
>> suspect
>> this is causing your segments to roll and be deleted.
>>
>> Thanks,
>> Damian
>>
>> On Fri, 15 Dec 2017 at 11:49 Wim Van Leuven <
>> wim.vanleu...@highestpoint.biz>
>> wrote:
>>
>> > Hello all,
>> >
>> > We are running some Kafka Streams processing apps over Confluent OS
>> > (v3.2.0) and I'm seeing unexpected but 'consitent' behaviour regarding
>> > segment and index deletion.
>> >
>> > So, we have a topic 'input' that contains about 30M records to ingest. A
>> > 1st processor transforms and pipes the data onto a second, intermediate
>> > topic. A 2nd processor picks up the records, treats them and sends them
>> > out.
>> >
>> > On our test environment the intermediate topic was set up with a
>> retention
>> > of 1 hour because we don't need to keep the data, only while processing.
>> >
>> > On a test run we saw the 2nd processor exit with exceptions that it
>> > couldn't read offsets. We do not automatically reset because it should
>> not
>> > happen.
>> >
>> > org.apache.kafka.streams.errors.StreamsException: No valid committed
>> offset
>> > found for input topic cdr-raw-arch (partition 1) and no valid reset
>> policy
>> > configured. You need to set configuration parameter "auto.offset.reset"
>> or
>> > specify a topic specific reset policy via
>> > KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or
>> > KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)
>> >
>> > As we thought that it's the topic data expiring (processing takes longer
>> > than 1 hour) we changed the topic to retain the data for 1 day.
>> >
>> > On rerun, we however saw exactly the same behaviour. That's why I'm
>> saying
>> > 'consistent behaviour' above.
>> >
>> > In the server logs, we see that kafka is rolling segments but
>> immediately
>> > scheduling them for deletion.
>> >
>> > [2017-12-15 11:01:46,992] INFO Rolled new log segment for
>> > 'cdr-raw-arch-1' in 1 ms. (kafka.log.Log)
>> > [2017-12-15 11:01:46,993] INFO Scheduling log segment 7330185 for log
>> > cdr-raw-arch-1 for deletion. (kafka.log.Log)
>> > [2017-12-15 11:01:46,995] INFO Rolled new log segment for
>> > 'cdr-raw-arch-0' in 2 ms. (kafka.log.Log)
>> > [2017-12-15 11:01:46,995] INFO Scheduling log segment 7335872 for log
>> > cdr-raw-arch-0 for deletion. (kafka.log.Log)
>> > [2017-12-15 11:02:46,995] INFO Deleting segment 7330185 from log
>> > cdr-raw-arch-1. (kafka.log.Log)
>> > [2017-12-15 11:02:46,996] INFO Deleting segment 7335872 from log
>> > cdr-raw-arch-0. (kafka.log.Log)
>> > [2017-12-15 11:02:47,170] INFO Deleting index
>> > /data/4/kafka/cdr-raw-arch-1/00000000000007330185.index.deleted
>> > (kafka.log.OffsetIndex)
>> > [2017-12-15 11:02:47,171] INFO Deleting index
>> > /data/4/kafka/cdr-raw-arch-1/00000000000007330185.timeindex.deleted
>> > (kafka.log.TimeIndex)
>> > [2017-12-15 11:02:47,172] INFO Deleting index
>> > /data/3/kafka/cdr-raw-arch-0/00000000000007335872.index.deleted
>> > (kafka.log.OffsetIndex)
>> > [2017-12-15 11:02:47,173] INFO Deleting index
>> > /data/3/kafka/cdr-raw-arch-0/00000000000007335872.timeindex.deleted
>> > (kafka.log.TimeIndex)
>> >
>> >
>> > However, I do not understand the behaviour: Why is kafka deleting the
>> data
>> > on the intermediary topic before it got processed? Almost immediately
>> even?
>> >
>> > We do use timestamp extractors to pull business time from the records.
>> Is
>> > that taken into account for retention time? Or is retention only based
>> on
>> > times of the files on disk?
>> >
>> > Thank you to shed any light on this problem!
>> >
>> > Kind regards!
>> > -wim
>> >
>>
>

Reply via email to