wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r754759558



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);

Review comment:
       before add and remove are called the version is 0 once we add a topology 
it becomes 1

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -135,31 +189,42 @@ public void maybeWaitForNonEmptyTopology(final 
Supplier<State> threadState) {
         }
     }
 
-    public void registerAndBuildNewTopology(final InternalTopologyBuilder 
newTopologyBuilder) {
+    /**
+     * Adds the topology and registers a future that listens for all threads 
on the older version to see the update
+     */
+    public KafkaFuture<Void> registerAndBuildNewTopology(final 
InternalTopologyBuilder newTopologyBuilder) {
+        final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
         try {
             lock();
             version.topologyVersion.incrementAndGet();
             log.info("Adding NamedTopology {}, latest topology version is {}", 
newTopologyBuilder.topologyName(), version.topologyVersion.get());
+            version.activeTopologyWaiters.add(new 
TopologyVersionWaiters(topologyVersion(), future));

Review comment:
       responded to your other comment :)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void reachedLatestVersion(final String threadName) {
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = 
version.activeTopologyWaiters.listIterator();

Review comment:
       I reason I would prefer to keep it as a list of `topologyVersionWaiters` 
is because while threads are reaching a version the topology can be updated 
again. Maybe I have been looking at this too long but it seems easier to keep 
track of this way to me. We have a complexity of the number of times the 
topology have updated * the number of threads. Realistically the number of 
threads are bounded so we can treat as a constant. And we would have the same 
number of futures (times the topology has been updated) to complete either way. 
I think the little extra verbosity makes it easier to really understand what is 
going on. But then maybe I am too close to this code :|

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,14 +907,16 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via 
#addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || 
topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
+        if (topologyMetadata.isEmpty() || 
topologyMetadata.needsUpdate(getName())) {
             taskManager.handleTopologyUpdates();
+            log.info("StreamThread has detected an update to the topology, 
triggering a rebalance to refresh the assignment");
+            if (topologyMetadata.isEmpty()) {

Review comment:
       if we are removing the last topology we need to unsubscribe for a few 
reasons. One of which is we can't reset offsets while still subscribed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to