Hi All, I have a KStreams application running inside a Docker container which uses a persistent key-value store.
I have configured state.dir with a value of /tmp/kafka-streams (which is the default). When I start this container using "docker run", I mount /tmp/kafka-streams to a directory on my host machine which is, say for example, /mnt/storage/kafka-streams. My application.id is "myapp". I have 288 partitions in my input topic which means my state store / changelog topic will also have that many partitions. Accordingly, when start my Docker container, I see that there a folder with the number of the partition such as 0_1, 0_2....0_288 under /mnt/storage/kafka-streams/myapp/ When I shutdown my application, I do not see any checkpoint file in any of the partition directories. And when I restart my application, it starts fetching the records from the changelog topic rather than reading from local disk. I suspect this is because there is no .checkpoint file in any of the partition directories. This is what I see in the startup log. It seems to be bootstrapping the entire state store from the changelog topic i.e. performing network I/O rather than reading from what is on disk : " 2022-05-31T12:08:02.791 [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN o.a.k.s.p.i.ProcessorStateManager - MSG=stream-thread [myapp-f6900c0a-50ca-43a0-8a4b-95eaa d9e5093-StreamThread-122] task [0_170] State store MyAppRecordStore did not find checkpoint offsets while stores are not empty, since under EOS it has the risk of getting uncommitte d data in stores we have to treat it as a task corruption error and wipe out the local state of task 0_170 before re-bootstrapping 2022-05-31T12:08:02.791 [myapp-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad 9e5093-StreamThread-122] Detected the states of tasks [0_170] are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_170] are corrupted and hence needs to be re-initialized at org.apache.kafka.streams.processor.internals.ProcessorStateManager.initializeStoreOffsetsFromCheckpoint(ProcessorStateManager.java:254) at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:109) at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216) at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433) at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) " 1) Should I expect to see a checkpoint file in each of the partition directories under /mnt/storage/kafka-streams/myapp/ when I shutdown my application ? 2) Is this an issue because I am running my KStreams app inside a docker container ? If there were permissions issues, then I would have expected to see issues in creating the other files such as .lock or rocksdb folder (and it's contents). My runtime environment is Docker 1.13.1 on RHEL 7.