ableegoldman commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752831095



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
-     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
                                                    " as another of the same 
name already exists");
         }
-        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
-     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets 
for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> 
topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+
+        final KafkaFuture<Void> removeTopologyFuture = 
topologyMetadata.unregisterTopology(topologyToRemove);
+
+        if (resetOffsets) {
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                try {
+                    removeTopologyFuture.get();

Review comment:
       Hm, I see. Ok that's potentially going to be a bit of an issue with 
multi-node clusters...let's chat about this tomorrow, I have some thoughts on a 
few different approaches we might want to consider here.
   
   That said, we still should not call `get()` here, can we just move the 
offset reset into `TopologyMetadata` and do it after the last thread has 
processed the topology removal? I know it's similar but I really think we 
should make sure to return from `removeNamedTopology()` ASAP so that it's async 
and the caller can in theory do some other stuff if they wanted to before 
blocking on the result




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to