wcarlson5 commented on a change in pull request #11479: URL: https://github.com/apache/kafka/pull/11479#discussion_r754759558
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -108,6 +124,44 @@ private void unlock() { version.topologyLock.unlock(); } + public Collection<String> sourceTopicsForTopology(final String name) { + return builders.get(name).sourceTopicCollection(); + } + + public boolean needsUpdate(final String threadName) { + return threadVersions.get(threadName) < topologyVersion(); + } + + public void registerThread(final String threadName) { + threadVersions.put(threadName, 0L); Review comment: before add and remove are called the version is 0 once we add a topology it becomes 1 ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -135,31 +189,42 @@ public void maybeWaitForNonEmptyTopology(final Supplier<State> threadState) { } } - public void registerAndBuildNewTopology(final InternalTopologyBuilder newTopologyBuilder) { + /** + * Adds the topology and registers a future that listens for all threads on the older version to see the update + */ + public KafkaFuture<Void> registerAndBuildNewTopology(final InternalTopologyBuilder newTopologyBuilder) { + final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>(); try { lock(); version.topologyVersion.incrementAndGet(); log.info("Adding NamedTopology {}, latest topology version is {}", newTopologyBuilder.topologyName(), version.topologyVersion.get()); + version.activeTopologyWaiters.add(new TopologyVersionWaiters(topologyVersion(), future)); Review comment: responded to your other comment :) ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -108,6 +124,44 @@ private void unlock() { version.topologyLock.unlock(); } + public Collection<String> sourceTopicsForTopology(final String name) { + return builders.get(name).sourceTopicCollection(); + } + + public boolean needsUpdate(final String threadName) { + return threadVersions.get(threadName) < topologyVersion(); + } + + public void registerThread(final String threadName) { + threadVersions.put(threadName, 0L); + } + + public void unregisterThread(final String threadName) { + threadVersions.remove(threadName); + } + + public void reachedLatestVersion(final String threadName) { + try { + lock(); + final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator(); Review comment: I reason I would prefer to keep it as a list of `topologyVersionWaiters` is because while threads are reaching a version the topology can be updated again. Maybe I have been looking at this too long but it seems easier to keep track of this way to me. We have a complexity of the number of times the topology have updated * the number of threads. Realistically the number of threads are bounded so we can treat as a constant. And we would have the same number of futures (times the topology has been updated) to complete either way. I think the little extra verbosity makes it easier to really understand what is going on. But then maybe I am too close to this code :| ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -908,14 +907,16 @@ private void initializeAndRestorePhase() { // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology private void checkForTopologyUpdates() { - if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) { - lastSeenTopologyVersion = topologyMetadata.topologyVersion(); + if (topologyMetadata.isEmpty() || topologyMetadata.needsUpdate(getName())) { taskManager.handleTopologyUpdates(); + log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment"); + if (topologyMetadata.isEmpty()) { Review comment: if we are removing the last topology we need to unsubscribe for a few reasons. One of which is we can't reset offsets while still subscribed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org