guozhangwang commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r754719711



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);

Review comment:
       Does the version starts at 0 or starts at 1? If it start at 0 is it 
possible that a newly added thread would not get notified of the initial 
version 0?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,14 +907,16 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via 
#addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || 
topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
+        if (topologyMetadata.isEmpty() || 
topologyMetadata.needsUpdate(getName())) {
             taskManager.handleTopologyUpdates();
+            log.info("StreamThread has detected an update to the topology, 
triggering a rebalance to refresh the assignment");
+            if (topologyMetadata.isEmpty()) {

Review comment:
       Why we add this if condition? I cannot read the motivations here..

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void reachedLatestVersion(final String threadName) {
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = 
version.activeTopologyWaiters.listIterator();

Review comment:
       Correct me if I think crazily here :P My read is that the topology 
version of thread can ONLY be incremented (maybe by more than 1), but never 
decremented. So we actually do not need to remember what are the "waiting" 
versions as a list, instead we can just keep a list of futures for each thread, 
and when a thread has successfully `handleTopologyUpdates`, we can immediately 
complete all the currently maintained futures since there will be no futures 
that can ever be related to a newer version that this thread has just updated 
to.
   
   With that, we can 1) get rid of the `topologyVersionWaiters`, 2) do not need 
the nest loop of while + stream() below fo complete futures. Instead we just 
keep a list of futures without any additional futures associated with them per 
thread.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -135,31 +189,42 @@ public void maybeWaitForNonEmptyTopology(final 
Supplier<State> threadState) {
         }
     }
 
-    public void registerAndBuildNewTopology(final InternalTopologyBuilder 
newTopologyBuilder) {
+    /**
+     * Adds the topology and registers a future that listens for all threads 
on the older version to see the update
+     */
+    public KafkaFuture<Void> registerAndBuildNewTopology(final 
InternalTopologyBuilder newTopologyBuilder) {
+        final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
         try {
             lock();
             version.topologyVersion.incrementAndGet();
             log.info("Adding NamedTopology {}, latest topology version is {}", 
newTopologyBuilder.topologyName(), version.topologyVersion.get());
+            version.activeTopologyWaiters.add(new 
TopologyVersionWaiters(topologyVersion(), future));

Review comment:
       Please see my other comment: instead of keeping a single list of 
`topologyVersionWaiters`, could we just keep a list of futures per thread, and 
once updated immediately complete all of them?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to