Hi, I am using InMemoryStore along with GlobalKTable. I came to realize that I was losing on data once I restart my stream application while it was consuming data from kafka topic since it would always start with last saved checkpoint. This shall work fine with RocksDB it being a persistent store. for in memory store it should be consume from beginning.
Debugging it further, I looked at the code for GlobalStateManagerImpl(this one works for GlobalKTable) and was comparing the same with ProcessorStateManagerImpl(this one works for KTable). In ProcessorStateManagerImpl.checkpoint, we have added the check for when state store being persistent before writing the checkpoints, the same check is not there in GlobalStateManagerImpl.checkpoint method. Do you think the same check needs to be added for GlobalStateManagerImpl. public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) { log.trace("{} Writing checkpoint: {}", logPrefix, ackedOffsets); checkpointedOffsets.putAll(changelogReader.restoredOffsets()); for (final Map.Entry<String, StateStore> entry : stores.entrySet()) { final String storeName = entry.getKey(); // only checkpoint the offset to the offsets file if // it is persistent AND changelog enabled * if (entry.getValue().persistent() && storeToChangelogTopic.containsKey(storeName)) {* final String changelogTopic = storeToChangelogTopic.get( storeName); final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); if (ackedOffsets.containsKey(topicPartition)) { // store the last offset + 1 (the log position after restoration) checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1); } else if (restoredOffsets.containsKey(topicPartition)) { checkpointedOffsets.put(topicPartition, restoredOffsets.get(topicPartition)); } } } // write the checkpoint file before closing, to indicate clean shutdown try { if (checkpoint == null) { checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); } checkpoint.write(checkpointedOffsets); } catch (final IOException e) { log.warn("Failed to write checkpoint file to {}:", new File(baseDir, CHECKPOINT_FILE_NAME), e); } } Regards, -Sameer.