Hello all,

We are running some Kafka Streams processing apps over Confluent OS
(v3.2.0) and I'm seeing unexpected but 'consitent' behaviour regarding
segment and index deletion.

So, we have a topic 'input' that contains about 30M records to ingest. A
1st processor transforms and pipes the data onto a second, intermediate
topic. A 2nd processor picks up the records, treats them and sends them
out.

On our test environment the intermediate topic was set up with a retention
of 1 hour because we don't need to keep the data, only while processing.

On a test run we saw the 2nd processor exit with exceptions that it
couldn't read offsets. We do not automatically reset because it should not
happen.

org.apache.kafka.streams.errors.StreamsException: No valid committed offset
found for input topic cdr-raw-arch (partition 1) and no valid reset policy
configured. You need to set configuration parameter "auto.offset.reset" or
specify a topic specific reset policy via
KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or
KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)

As we thought that it's the topic data expiring (processing takes longer
than 1 hour) we changed the topic to retain the data for 1 day.

On rerun, we however saw exactly the same behaviour. That's why I'm saying
'consistent behaviour' above.

In the server logs, we see that kafka is rolling segments but immediately
scheduling them for deletion.

[2017-12-15 11:01:46,992] INFO Rolled new log segment for
'cdr-raw-arch-1' in 1 ms. (kafka.log.Log)
[2017-12-15 11:01:46,993] INFO Scheduling log segment 7330185 for log
cdr-raw-arch-1 for deletion. (kafka.log.Log)
[2017-12-15 11:01:46,995] INFO Rolled new log segment for
'cdr-raw-arch-0' in 2 ms. (kafka.log.Log)
[2017-12-15 11:01:46,995] INFO Scheduling log segment 7335872 for log
cdr-raw-arch-0 for deletion. (kafka.log.Log)
[2017-12-15 11:02:46,995] INFO Deleting segment 7330185 from log
cdr-raw-arch-1. (kafka.log.Log)
[2017-12-15 11:02:46,996] INFO Deleting segment 7335872 from log
cdr-raw-arch-0. (kafka.log.Log)
[2017-12-15 11:02:47,170] INFO Deleting index
/data/4/kafka/cdr-raw-arch-1/00000000000007330185.index.deleted
(kafka.log.OffsetIndex)
[2017-12-15 11:02:47,171] INFO Deleting index
/data/4/kafka/cdr-raw-arch-1/00000000000007330185.timeindex.deleted
(kafka.log.TimeIndex)
[2017-12-15 11:02:47,172] INFO Deleting index
/data/3/kafka/cdr-raw-arch-0/00000000000007335872.index.deleted
(kafka.log.OffsetIndex)
[2017-12-15 11:02:47,173] INFO Deleting index
/data/3/kafka/cdr-raw-arch-0/00000000000007335872.timeindex.deleted
(kafka.log.TimeIndex)


However, I do not understand the behaviour: Why is kafka deleting the data
on the intermediary topic before it got processed? Almost immediately even?

We do use timestamp extractors to pull business time from the records. Is
that taken into account for retention time? Or is retention only based on
times of the files on disk?

Thank you to shed any light on this problem!

Kind regards!
-wim

Reply via email to