Hi,

I am using InMemoryStore along with GlobalKTable. I came to realize that I
was losing on data once I restart my stream application while it was
consuming data from kafka topic since it would always start with last saved
checkpoint. This shall work fine with RocksDB it being a persistent store.
for in memory store it should be consume from beginning.

Debugging it further, I looked at the code for GlobalStateManagerImpl(this
one works for GlobalKTable) and was comparing the same with
ProcessorStateManagerImpl(this one works for KTable).

In ProcessorStateManagerImpl.checkpoint, we have added the check for when
state store being persistent before writing the checkpoints, the same check
is not there in GlobalStateManagerImpl.checkpoint method. Do you think the
same check needs to be added for GlobalStateManagerImpl.

  public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
        log.trace("{} Writing checkpoint: {}", logPrefix, ackedOffsets);
        checkpointedOffsets.putAll(changelogReader.restoredOffsets());
        for (final Map.Entry<String, StateStore> entry : stores.entrySet())
{
            final String storeName = entry.getKey();
            // only checkpoint the offset to the offsets file if
            // it is persistent AND changelog enabled
*            if (entry.getValue().persistent() &&
storeToChangelogTopic.containsKey(storeName)) {*
                final String changelogTopic = storeToChangelogTopic.get(
storeName);
                final TopicPartition topicPartition = new
TopicPartition(changelogTopic, getPartition(storeName));
                if (ackedOffsets.containsKey(topicPartition)) {
                    // store the last offset + 1 (the log position after
restoration)
                    checkpointedOffsets.put(topicPartition,
ackedOffsets.get(topicPartition) + 1);
                } else if (restoredOffsets.containsKey(topicPartition)) {
                    checkpointedOffsets.put(topicPartition,
restoredOffsets.get(topicPartition));
                }
            }
        }
        // write the checkpoint file before closing, to indicate clean
shutdown
        try {
            if (checkpoint == null) {
                checkpoint = new OffsetCheckpoint(new File(baseDir,
CHECKPOINT_FILE_NAME));
            }
            checkpoint.write(checkpointedOffsets);
        } catch (final IOException e) {
            log.warn("Failed to write checkpoint file to {}:", new
File(baseDir, CHECKPOINT_FILE_NAME), e);
        }
    }

Regards,
-Sameer.

Reply via email to