ableegoldman commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r755680080



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,104 @@ public NamedTopologyBuilder 
newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
-     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
                                                    " as another of the same 
name already exists");
         }
-        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
-     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets 
for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final Set<TaskMetadata> tasks = new HashSet<>(t.activeTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> 
topologyMetadata.sourceTopicsForTopology(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+        final KafkaFuture<Void> removeTopologyFuture = 
topologyMetadata.unregisterTopology(topologyToRemove);
 
-        topologyMetadata.unregisterTopology(topologyToRemove);
+        if (resetOffsets) {
+            final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+            log.info("partitions to reset: {}", partitionsToReset);

Review comment:
       ```suggestion
               log.info("Resetting offsets for the following partitions of 
NamedTopology {}: {}", topologyToRemove, partitionsToReset);
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void reachedLatestVersion(final String threadName) {

Review comment:
       nit, this method name has been confusing me since it seems to imply that 
it would return a boolean for whether or not we reached the latest version, can 
you give a more descriptive name?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,104 @@ public NamedTopologyBuilder 
newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
-     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
                                                    " as another of the same 
name already exists");
         }
-        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
-     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets 
for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final Set<TaskMetadata> tasks = new HashSet<>(t.activeTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> 
topologyMetadata.sourceTopicsForTopology(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+        final KafkaFuture<Void> removeTopologyFuture = 
topologyMetadata.unregisterTopology(topologyToRemove);
 
-        topologyMetadata.unregisterTopology(topologyToRemove);
+        if (resetOffsets) {
+            final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                removeTopologyFuture.whenComplete((v, throwable) -> {
+                    if (throwable != null) {
+                        future.completeExceptionally(throwable);
+                    }
+                    DeleteConsumerGroupOffsetsResult deleteOffsetsResult = 
null;
+                    while (deleteOffsetsResult == null) {

Review comment:
       Is the point of this loop to account for multi-node clusters where the 
deleteOffsetsRequest might throw an exception? If so, can we catch that 
specific exception and retry rather than retrying for all ExecutionExceptions 
which could be any number of potentially fatal things, I think

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void reachedLatestVersion(final String threadName) {
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = 
version.activeTopologyWaiters.listIterator();

Review comment:
       I see @guozhangwang 's point, but I think it's not worth obsessing over. 
@wcarlson5 how about you just file a V1.1 ticket for cleanup like this? Doesn't 
seem worth blocking the PR over :) 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,104 @@ public NamedTopologyBuilder 
newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
-     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
                                                    " as another of the same 
name already exists");
         }
-        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
-     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure

Review comment:
       And/or  just temporarily change the return type here back to `void`  
since it's pretty much a blocking call for now

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,104 @@ public NamedTopologyBuilder 
newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
-     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
                                                    " as another of the same 
name already exists");
         }
-        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
-     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure

Review comment:
       Can you file an AK ticket and leave a TODO here mentioning that this 
method is not actually async at the moment? Since we have some interest from 
Kafka Streams users who want to try out this feature before it's officially 
published, we should at least make sure the javadocs reflect the current 
behavior of these APIs

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,14 +907,16 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via 
#addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || 
topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
+        if (topologyMetadata.isEmpty() || 
topologyMetadata.needsUpdate(getName())) {
             taskManager.handleTopologyUpdates();
+            log.info("StreamThread has detected an update to the topology, 
triggering a rebalance to refresh the assignment");
+            if (topologyMetadata.isEmpty()) {

Review comment:
       How about we just call `subscribeConsumer()` here without any `if`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to