ableegoldman commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752809604



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,48 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopologies(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+
+    public boolean reachedLatestVersion(final String threadName) {
+        boolean rebalance = false;
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = 
version.activeTopologyWaiters.listIterator();
+            TopologyVersionWaiters topologyVersionWaiters;
+            threadVersions.put(threadName, topologyVersion());
+            while (iterator.hasNext()) {
+                topologyVersionWaiters = iterator.next();
+                final long verison = topologyVersionWaiters.topologyVersion;
+                if (verison <= threadVersions.get(threadName)) {
+                    if (threadVersions.values().stream().allMatch(t -> t >= 
verison)) {
+                        topologyVersionWaiters.future.complete(null);
+                        iterator.remove();
+                        log.info("thread {} is now on on version {}", 
threadName, topologyVersionWaiters.topologyVersion);

Review comment:
       ```suggestion
                           log.info("Thread {} is now on topology version {}", 
threadName, topologyVersionWaiters.topologyVersion);
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,48 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopologies(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+

Review comment:
       super nit: extra line break

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,48 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopologies(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+
+    public boolean reachedLatestVersion(final String threadName) {
+        boolean rebalance = false;
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = 
version.activeTopologyWaiters.listIterator();
+            TopologyVersionWaiters topologyVersionWaiters;
+            threadVersions.put(threadName, topologyVersion());
+            while (iterator.hasNext()) {
+                topologyVersionWaiters = iterator.next();
+                final long verison = topologyVersionWaiters.topologyVersion;

Review comment:
       nit: typo in "verison", also since `version` is already taken by the 
`TopologyVersion` field we should rename either that field or this variable 
(probably makes sense to rename the field to `topologyVersion` or 
`currentTopologyVersion` or `latestTopologyVersion`, etc...

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
-     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
                                                    " as another of the same 
name already exists");
         }
-        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
-     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets 
for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> 
topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+
+        final KafkaFuture<Void> removeTopologyFuture = 
topologyMetadata.unregisterTopology(topologyToRemove);
+
+        if (resetOffsets) {
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                try {
+                    removeTopologyFuture.get();

Review comment:
       We shouldn't call `get()` on the futures, that's up to the user to do. 
Technically speaking the point of this isn't to make the APIs blocking, but to 
enable users to block on the completion of them -- ie, we want to return the 
`RemoveNamedTopologyResult` ASAP and allow the user to block on it, if they want

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -578,6 +577,7 @@ public void run() {
             failedStreamThreadSensor.record();
             this.streamsUncaughtExceptionHandler.accept(e);
         } finally {
+            topologyMetadata.unregisterThread(threadMetadata.threadName());

Review comment:
       This should be moved to inside the `completeShutdown()` method, since we 
register the thread inside the StreamThread constructor but it's possible for 
the thread to be shut down before it ever starts running, in which case we'd 
still want to make sure it gets unregistered

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,48 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopologies(final String name) {

Review comment:
       ```suggestion
       public Collection<String> sourceTopicsForTopology(final String name) {
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
-     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
                                                    " as another of the same 
name already exists");
         }
-        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
-     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets 
for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());

Review comment:
       We don't need to do standbys since they don't commit offsets on the 
input topics (and imo we should not reset offsets on changelogs or 
repartitions, but leave it up to the user to clean those up how they see fit)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,48 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopologies(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+
+    public boolean reachedLatestVersion(final String threadName) {
+        boolean rebalance = false;
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = 
version.activeTopologyWaiters.listIterator();
+            TopologyVersionWaiters topologyVersionWaiters;
+            threadVersions.put(threadName, topologyVersion());
+            while (iterator.hasNext()) {
+                topologyVersionWaiters = iterator.next();
+                final long verison = topologyVersionWaiters.topologyVersion;
+                if (verison <= threadVersions.get(threadName)) {
+                    if (threadVersions.values().stream().allMatch(t -> t >= 
verison)) {
+                        topologyVersionWaiters.future.complete(null);
+                        iterator.remove();
+                        log.info("thread {} is now on on version {}", 
threadName, topologyVersionWaiters.topologyVersion);
+                        rebalance = true;

Review comment:
       This is a little tricky, but I think it might make sense to do the 
reverse. It has to do with how the thread handles being assigned tasks that it 
doesn't recognize because it hasn't yet updated its topology, which is to just 
make a note of the assigned task name and then stash it away until we see a 
topology update, during which we then check if the task was part of this new 
topology and if so we go on to create it. Which is a long way of saying, it's 
better to rebalance earlier than later, ie to rebalance when the _first_ thread 
processes a topology update rather than when the _last_ thread has processed 
the update. Partly so that the first thread can go ahead and start processing 
tasks from that topology right away, and partly because if we wait until the 
last thread acks the topology update then we could get in trouble and end up 
never rebalancing in extreme cases, for example if a thread gets stuck in 
processing (like if the user has bad custom logic that sends it into an 
 infinite loop, which is a real example from an escalation I once had)
   
   Happy to chat over Zoom if you want to dig into this a bit more, I thought I 
had left a more specific TODO comment about when we do/don't want to rebalance 
but I just read it again and it's wildly unhelpful 😅 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -61,6 +74,8 @@ public KafkaStreamsNamedTopologyWrapper(final Properties 
props, final KafkaClien
 
     private KafkaStreamsNamedTopologyWrapper(final StreamsConfig config, final 
KafkaClientSupplier clientSupplier) {
         super(new TopologyMetadata(new ConcurrentSkipListMap<>(), config), 
config, clientSupplier);
+        final LogContext logContext = new LogContext();

Review comment:
       I think you can just instantiate the field like this instead of going 
through the `LogContext`, I checked that class and it seems to do the same 
thing as just
   ``` 
   private final Logger log = 
LoggerFactory.getLogger(KafkaStreamsNamedTopologyWrapper.class);
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
-     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
                                                    " as another of the same 
name already exists");
         }
-        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
-     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets 
for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> 
topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+

Review comment:
       nit: extra line breaks

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
-     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
                                                    " as another of the same 
name already exists");
         }
-        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
-     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets 
for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();

Review comment:
       super nit:
   ```suggestion
                   final Set<TaskMetadata> tasks = new HashSet<>();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to