guozhangwang commented on a change in pull request #11479: URL: https://github.com/apache/kafka/pull/11479#discussion_r754719711
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -108,6 +124,44 @@ private void unlock() { version.topologyLock.unlock(); } + public Collection<String> sourceTopicsForTopology(final String name) { + return builders.get(name).sourceTopicCollection(); + } + + public boolean needsUpdate(final String threadName) { + return threadVersions.get(threadName) < topologyVersion(); + } + + public void registerThread(final String threadName) { + threadVersions.put(threadName, 0L); Review comment: Does the version starts at 0 or starts at 1? If it start at 0 is it possible that a newly added thread would not get notified of the initial version 0? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -908,14 +907,16 @@ private void initializeAndRestorePhase() { // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology private void checkForTopologyUpdates() { - if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) { - lastSeenTopologyVersion = topologyMetadata.topologyVersion(); + if (topologyMetadata.isEmpty() || topologyMetadata.needsUpdate(getName())) { taskManager.handleTopologyUpdates(); + log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment"); + if (topologyMetadata.isEmpty()) { Review comment: Why we add this if condition? I cannot read the motivations here.. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -108,6 +124,44 @@ private void unlock() { version.topologyLock.unlock(); } + public Collection<String> sourceTopicsForTopology(final String name) { + return builders.get(name).sourceTopicCollection(); + } + + public boolean needsUpdate(final String threadName) { + return threadVersions.get(threadName) < topologyVersion(); + } + + public void registerThread(final String threadName) { + threadVersions.put(threadName, 0L); + } + + public void unregisterThread(final String threadName) { + threadVersions.remove(threadName); + } + + public void reachedLatestVersion(final String threadName) { + try { + lock(); + final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator(); Review comment: Correct me if I think crazily here :P My read is that the topology version of thread can ONLY be incremented (maybe by more than 1), but never decremented. So we actually do not need to remember what are the "waiting" versions as a list, instead we can just keep a list of futures for each thread, and when a thread has successfully `handleTopologyUpdates`, we can immediately complete all the currently maintained futures since there will be no futures that can ever be related to a newer version that this thread has just updated to. With that, we can 1) get rid of the `topologyVersionWaiters`, 2) do not need the nest loop of while + stream() below fo complete futures. Instead we just keep a list of futures without any additional futures associated with them per thread. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -135,31 +189,42 @@ public void maybeWaitForNonEmptyTopology(final Supplier<State> threadState) { } } - public void registerAndBuildNewTopology(final InternalTopologyBuilder newTopologyBuilder) { + /** + * Adds the topology and registers a future that listens for all threads on the older version to see the update + */ + public KafkaFuture<Void> registerAndBuildNewTopology(final InternalTopologyBuilder newTopologyBuilder) { + final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>(); try { lock(); version.topologyVersion.incrementAndGet(); log.info("Adding NamedTopology {}, latest topology version is {}", newTopologyBuilder.topologyName(), version.topologyVersion.get()); + version.activeTopologyWaiters.add(new TopologyVersionWaiters(topologyVersion(), future)); Review comment: Please see my other comment: instead of keeping a single list of `topologyVersionWaiters`, could we just keep a list of futures per thread, and once updated immediately complete all of them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org