mjsax commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1509703745
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -203,13 +202,27 @@ public void registerStore(final StateStore store,
);
try {
- restoreState(
- stateRestoreCallback,
- topicPartitions,
- highWatermarks,
- store.name(),
- converterForStore(store)
- );
+ if
(topology.storeNameToReprocessOnRestore().getOrDefault(store.name(), false)) {
+ globalConsumer.assign(topicPartitions);
+ globalConsumer.seekToBeginning(topicPartitions);
+ for (final TopicPartition topicPartition : topicPartitions) {
+ stateRestoreListener.onRestoreStart(topicPartition,
store.name(),
+ checkpointFileCache.getOrDefault(topicPartition, 0L),
+ checkpointFileCache.getOrDefault(topicPartition, 0L));
+ stateRestoreListener.onRestoreEnd(topicPartition,
store.name(), 0L);
Review Comment:
Where does the actual restore happen?
Note that the original `restoreState()` is the "bootstrapping phase" before
we move to `RUNNING`, and we should preserve this behavior. It seem, your PR
basically skips the bootstrapping, and relies on the regular processing to
re-read the data? For this case, we would go to `RUNNING` with an empty global
store and thus lookups might fail as the data is not loaded yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]