guozhangwang commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633182424



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {

Review comment:
       Actually I just observed that in this case, the mock context does not 
actually use the `stateManager` at all, but created its own `StateManagerStub`, 
and hence would not call `stateManager.registerStore`, therefore the stores set 
would always be empty.. I think this is okay since in unit test we are only 
checking each single function's behavior (in this case, `initialize`) anyways.
   
   Also as I browse through the code (see other comment below), in this unit 
test when the exception is thrown the store would not be closed yet, and in 
practice we would rely on the `thread.shutdown` itself to shutdown the global 
state manager, and hence close all state stores.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
             throw new IllegalArgumentException(String.format("Trying to 
register store %s that is not a known global store", store.name()));
         }
 
+        // register the store first, so that if later an exception is thrown 
then eventually while we call `close`

Review comment:
       I add the store after the store names check, since if there's already a 
state store created, then we would not be able to book-keep both of them 
anyways.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {
             final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
             stateStore.init((StateStoreContext) globalProcessorContext, 
stateStore);

Review comment:
       Hmm, for production, do we ever restart a thread even for illegal-state 
or illegal-argument?
   
   Anyways, I think your concern is still valid, that at least for testing 
code, there's a risk. Since `store.init()` call is implemented at the state 
store impl customization, we cannot really enforce closing there. And today, 
for both global state and local state manager, we would throw the exception 
from `initialize state stores` all the way up to the `thread.run` and to user's 
exceptional handler. Though we call `thread.shutdown` eventually we would close 
all tasks anyways, but in the case you raised, the state store would not be in 
`stateManager.stores` set yet and hence would be leaked.
   
   What we can do is, e.g. in both local and global state manager, moving the 
`globalStores.put` / `stores.put` call at the beginning, before making any 
checks, so that when we throw and eventually `thread.shutdown`, the stores 
would already be in the set and would be closed.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -62,21 +62,22 @@
 public class GlobalStateManagerImpl implements GlobalStateManager {
     private final static long NO_DEADLINE = -1L;
 
-    private final Logger log;
     private final Time time;
-    private final Consumer<byte[], byte[]> globalConsumer;
+    private final Logger log;
     private final File baseDir;
-    private final Set<String> globalStoreNames = new HashSet<>();

Review comment:
       Just re-ordering member fields here, no adding/removing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to