ableegoldman commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r418838526



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -70,18 +69,16 @@ static void registerStateStores(final Logger log,
                 e
             );
         }
+
         log.debug("Acquired state directory lock");
 
         final boolean storeDirsEmpty = 
stateDirectory.directoryForTaskIsEmpty(id);
 
         // We should only load checkpoint AFTER the corresponding state 
directory lock has been acquired and
         // the state stores have been registered; we should not try to load at 
the state manager construction time.
         // See https://issues.apache.org/jira/browse/KAFKA-8574
-        for (final StateStore store : topology.stateStores()) {
-            processorContext.uninitialize();
-            store.init(processorContext, store);
-            log.trace("Registered state store {}", store.name());
-        }
+        stateMgr.registerStateStores(topology.stateStores(), processorContext);

Review comment:
       We need to unregister and register the changelogs during task type 
conversion, as standby changelogs are handled differently  than active ones 
during registration (to disable standby processing during restoration)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to