cadonna commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633518578



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {
             final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
             stateStore.init((StateStoreContext) globalProcessorContext, 
stateStore);

Review comment:
       > Hmm, for production, do we ever restart a thread even for 
illegal-state or illegal-argument?
   
   If the user decides to restart a stream thread in its exception handler it 
is possible.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -328,7 +328,8 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
                 converterForStore(store)) :
             new StateStoreMetadata(store);
 
-
+        // register the store first, so that if later an exception is thrown 
then eventually while we call `close`
+        // on the state manager this state store would be closed as well
         stores.put(storeName, storeMetadata);

Review comment:
       See my comment above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {

Review comment:
       If I put 
   ```
           assertThat(store1.isOpen(), is(false));
           assertThat(store2.isOpen(), is(false));
           assertThat(store3.isOpen(), is(false));
           assertThat(store4.isOpen(), is(false));
   ```
   on line 202 in `shouldThrowStreamsExceptionForOldTopicPartitions()` the test 
fails. Hence, we leak a state store.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
             throw new IllegalArgumentException(String.format("Trying to 
register store %s that is not a known global store", store.name()));
         }
 
+        // register the store first, so that if later an exception is thrown 
then eventually while we call `close`

Review comment:
       I agree that we would not be able to book-keep both, but the state store 
in `store` that we just opened is still open in line 172. So we need to close 
the state store in `store` before throwing the exception otherwise we will leak 
it. The same applies to line 176.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to