Thanks John ! It seems if I send a TERM signal to my KStreams application which is running inside a Docker container, then it results in a Clean shutdown. This also then creates a checkpoint file successfully. So, I guess I need to figure out how to send a TERM signal to my running Java KStreams application inside the Docker container. The KStreams application is actually launched by an entrypoint.sh script in the Docker container. If I send a signal to this container using "docker kill", this signal does not get passed to the java application which is spawned by the entrypoint.sh script.
Regards, Neeraj On Wednesday, 1 June, 2022, 04:38:16 pm GMT+10, John Roesler <vvcep...@apache.org> wrote: Hi Neeraj, Thanks for all that detail! Your expectation is correct. You should see the checkpoint files after a _clean_ shutdown, and then you should not see it bootstrap from the beginning of the changelog on the next startup. How are you shutting down the application? You'll want to call KafkaStreams#stop and wait for it to complete before stopping the java process. I hope this helps, -John On Tue, May 31, 2022, at 23:09, Neeraj Vaidya wrote: > Hi All, > I have a KStreams application running inside a Docker container which > uses a persistent key-value store. > > I have configured state.dir with a value of /tmp/kafka-streams (which > is the default). > > When I start this container using "docker run", I mount > /tmp/kafka-streams to a directory on my host machine which is, say for > example, /mnt/storage/kafka-streams. > > My application.id is "myapp". I have 288 partitions in my input topic > which means my state store / changelog topic will also have that many > partitions. Accordingly, when start my Docker container, I see that > there a folder with the number of the partition such as 0_1, > 0_2....0_288 under /mnt/storage/kafka-streams/myapp/ > > When I shutdown my application, I do not see any checkpoint file in any > of the partition directories. > > And when I restart my application, it starts fetching the records from > the changelog topic rather than reading from local disk. I suspect this > is because there is no .checkpoint file in any of the partition > directories. > > This is what I see in the startup log. It seems to be bootstrapping the > entire state store from the changelog topic i.e. performing network I/O > rather than reading from what is on disk : > > " > 2022-05-31T12:08:02.791 > [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN > o.a.k.s.p.i.ProcessorStateManager - MSG=stream-thread > [myapp-f6900c0a-50ca-43a0-8a4b-95eaa > d9e5093-StreamThread-122] task [0_170] State store MyAppRecordStore did > not find checkpoint offsets while stores are not empty, since under EOS > it has the risk of getting uncommitte > d data in stores we have to treat it as a task corruption error and > wipe out the local state of task 0_170 before re-bootstrapping > 2022-05-31T12:08:02.791 > [myapp-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad > 9e5093-StreamThread-122] Detected the states of tasks [0_170] are > corrupted. Will close the task as dirty and re-create and bootstrap > from scratch. > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_170] > are corrupted and hence needs to be re-initialized > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.initializeStoreOffsetsFromCheckpoint(ProcessorStateManager.java:254) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:109) > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433) > at > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) > " > > 1) Should I expect to see a checkpoint file in each of the partition > directories under /mnt/storage/kafka-streams/myapp/ when I shutdown my > application ? > > 2) Is this an issue because I am running my KStreams app inside a > docker container ? If there were permissions issues, then I would have > expected to see issues in creating the other files such as .lock or > rocksdb folder (and it's contents). > > My runtime environment is Docker 1.13.1 on RHEL 7.