Hi All,
I have a KStreams application running inside a Docker container which uses a 
persistent key-value store. 

I have configured state.dir with a value of /tmp/kafka-streams (which is the 
default).

When I start this container using "docker run", I mount /tmp/kafka-streams to a 
directory on my host machine which is, say for example, 
/mnt/storage/kafka-streams.

My application.id is "myapp". I have 288 partitions in my input topic which 
means my state store / changelog topic will also have that many partitions. 
Accordingly, when start my Docker container, I see that there a folder with the 
number of the partition such as 0_1, 0_2....0_288 under 
/mnt/storage/kafka-streams/myapp/

When I shutdown my application, I do not see any checkpoint file in any of the 
partition directories.

And when I restart my application, it starts fetching the records from the 
changelog topic rather than reading from local disk. I suspect this is because 
there is no .checkpoint file in any of the partition directories. 

This is what I see in the startup log. It seems to be bootstrapping the entire 
state store from the changelog topic i.e. performing network I/O rather than 
reading from what is on disk :

"
2022-05-31T12:08:02.791 
[mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
o.a.k.s.p.i.ProcessorStateManager - MSG=stream-thread 
[myapp-f6900c0a-50ca-43a0-8a4b-95eaa
d9e5093-StreamThread-122] task [0_170] State store MyAppRecordStore did not 
find checkpoint offsets while stores are not empty, since under EOS it has the 
risk of getting uncommitte
d data in stores we have to treat it as a task corruption error and wipe out 
the local state of task 0_170 before re-bootstrapping
2022-05-31T12:08:02.791 
[myapp-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad
9e5093-StreamThread-122] Detected the states of tasks [0_170] are corrupted. 
Will close the task as dirty and re-create and bootstrap from scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_170] are 
corrupted and hence needs to be re-initialized
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.initializeStoreOffsetsFromCheckpoint(ProcessorStateManager.java:254)
        at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:109)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
"

1) Should I expect to see a checkpoint file in each of the partition 
directories under /mnt/storage/kafka-streams/myapp/ when I shutdown my 
application ?

2) Is this an issue because I am running my KStreams app inside a docker 
container ? If there were permissions issues, then I would have expected to see 
issues in creating the other files such as .lock or rocksdb folder (and it's 
contents).

My runtime environment is Docker 1.13.1 on RHEL 7.

Reply via email to