guozhangwang commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r750795877



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -121,9 +164,9 @@ public void maybeWaitForNonEmptyTopology(final 
Supplier<State> threadState) {
         if (isEmpty() && threadState.get().isAlive()) {
             try {
                 lock();
-                while (isEmpty() && threadState.get().isAlive()) {

Review comment:
       See my other comment: I think this function 
`maybeWaitForNonEmptyTopology` would not be needed any more since it's only 
value is the `version.topologyCV.await();` part, which we can just move up to 
the caller's while loop.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via 
#addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || 
topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-            taskManager.handleTopologyUpdates();
-
+        do {
             topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);
+            if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) {

Review comment:
       As for the future: I think it's okay to let the first future be waiting 
a bit more if there are consecutive operations, i.e. in the above 
implementation we would complete all futures that have been registered so far 
when we eventually get every still-alive threads to catch up with the bumped 
version.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via 
#addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || 
topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-            taskManager.handleTopologyUpdates();
-

Review comment:
       For line 915 below: do we still need this line? Could we just inline the 
`Condition topologyCV` waiting within this while loop now?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +134,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
-     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+        log.error("adding {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
                                                    " as another of the same 
name already exists");
         }
-        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
-     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets 
for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+        log.error("Removing {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+//            .filter(t -> 
topologyMetadata.sourceTopicCollection().contains(t))

Review comment:
       Intentional?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +134,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
-     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+        log.error("adding {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
                                                    " as another of the same 
name already exists");
         }
-        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
-     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets 
for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has 
already shut down
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+        log.error("Removing {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+//            .filter(t -> 
topologyMetadata.sourceTopicCollection().contains(t))

Review comment:
       Also this logic seems would just include all partitions of all tasks, 
since it does not involve the `topologyToRemove` at all?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -83,6 +100,7 @@ public TopologyMetadata(final InternalTopologyBuilder 
builder,
         } else {
             builders.put(UNNAMED_TOPOLOGY, builder);
         }
+        getStreamThreadCount = () -> getNumStreamThreads(config);

Review comment:
       Why we need to initialize this supplier in the constructor (ditto 
below)? This variable is only called in `reachedVersion` when stream threads 
have been initialized already.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via 
#addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || 
topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-            taskManager.handleTopologyUpdates();
-
+        do {
             topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);
+            if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) {

Review comment:
       I feel this is getting more complicated than necessary to do 
KAFKA-12648: we have a `topologyMetadata` at the `KafkaStreams` layer while at 
the `StreamThread` layer we keep a `long lastSeenTopologyVersion`. If we let 
the `topologyMetadata` object which is shared among all threads (as well as 
their task managers etc) then we can simplify this. Just a sketchy thought here:
   
   * In `TopologyMetadata`  we maintain a `Map<String, Long>` from thread name 
to thread's current topology version. When new threads are added / threads are 
removed, this map would be updated as well (in synchronized way).
   * Inside a thread's `checkForTopologyUpdate`, in a synchronized block check 
if all threads' versions except this thread is equal to the current version: if 
yes, this thread would update its corresponding map entry as well and then 
trigger rebalance; otherwise, only update the corresponding map.
   
   So if consecutive topology updates are being issued, then we would only 
trigger a rebalance at the end when all threads reaches the end topology 
version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to