ableegoldman commented on a change in pull request #11479: URL: https://github.com/apache/kafka/pull/11479#discussion_r752809604
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -108,6 +124,48 @@ private void unlock() { version.topologyLock.unlock(); } + public Collection<String> sourceTopologies(final String name) { + return builders.get(name).sourceTopicCollection(); + } + + public boolean needsUpdate(final String threadName) { + return threadVersions.get(threadName) < topologyVersion(); + } + + public void registerThread(final String threadName) { + threadVersions.put(threadName, 0L); + } + + public void unregisterThread(final String threadName) { + threadVersions.remove(threadName); + } + + + public boolean reachedLatestVersion(final String threadName) { + boolean rebalance = false; + try { + lock(); + final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator(); + TopologyVersionWaiters topologyVersionWaiters; + threadVersions.put(threadName, topologyVersion()); + while (iterator.hasNext()) { + topologyVersionWaiters = iterator.next(); + final long verison = topologyVersionWaiters.topologyVersion; + if (verison <= threadVersions.get(threadName)) { + if (threadVersions.values().stream().allMatch(t -> t >= verison)) { + topologyVersionWaiters.future.complete(null); + iterator.remove(); + log.info("thread {} is now on on version {}", threadName, topologyVersionWaiters.topologyVersion); Review comment: ```suggestion log.info("Thread {} is now on topology version {}", threadName, topologyVersionWaiters.topologyVersion); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -108,6 +124,48 @@ private void unlock() { version.topologyLock.unlock(); } + public Collection<String> sourceTopologies(final String name) { + return builders.get(name).sourceTopicCollection(); + } + + public boolean needsUpdate(final String threadName) { + return threadVersions.get(threadName) < topologyVersion(); + } + + public void registerThread(final String threadName) { + threadVersions.put(threadName, 0L); + } + + public void unregisterThread(final String threadName) { + threadVersions.remove(threadName); + } + Review comment: super nit: extra line break ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -108,6 +124,48 @@ private void unlock() { version.topologyLock.unlock(); } + public Collection<String> sourceTopologies(final String name) { + return builders.get(name).sourceTopicCollection(); + } + + public boolean needsUpdate(final String threadName) { + return threadVersions.get(threadName) < topologyVersion(); + } + + public void registerThread(final String threadName) { + threadVersions.put(threadName, 0L); + } + + public void unregisterThread(final String threadName) { + threadVersions.remove(threadName); + } + + + public boolean reachedLatestVersion(final String threadName) { + boolean rebalance = false; + try { + lock(); + final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator(); + TopologyVersionWaiters topologyVersionWaiters; + threadVersions.put(threadName, topologyVersion()); + while (iterator.hasNext()) { + topologyVersionWaiters = iterator.next(); + final long verison = topologyVersionWaiters.topologyVersion; Review comment: nit: typo in "verison", also since `version` is already taken by the `TopologyVersion` field we should rename either that field or this variable (probably makes sense to rename the field to `topologyVersion` or `currentTopologyVersion` or `latestTopologyVersion`, etc... ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { /** * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running, - * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for + * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for * it to begin processing the new topology. * * @throws IllegalArgumentException if this topology name is already in use * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void addNamedTopology(final NamedTopology newTopology) { + public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) { + log.debug("Adding topology: {}", newTopology.name()); if (hasStartedOrFinishedShuttingDown()) { throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state); } else if (getTopologyByName(newTopology.name()).isPresent()) { throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() + " as another of the same name already exists"); } - topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); + return new AddNamedTopologyResult( + topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()) + ); } /** * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are - * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure + * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure * it stops processing the old topology. * + * @param topologyToRemove name of the topology to be removed + * @param resetOffsets whether to reset the committed offsets for any source topics + * * @throws IllegalArgumentException if this topology name cannot be found * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void removeNamedTopology(final String topologyToRemove) { + public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) { + log.debug("Removing topology: {}", topologyToRemove); if (!isRunningOrRebalancing()) { throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state); } else if (!getTopologyByName(topologyToRemove).isPresent()) { throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove); } + final Set<TopicPartition> partitionsToReset = metadataForLocalThreads() + .stream() + .flatMap(t -> { + final HashSet<TaskMetadata> tasks = new HashSet<>(); + tasks.addAll(t.activeTasks()); + tasks.addAll(t.standbyTasks()); + return tasks.stream(); + }) + .flatMap(t -> t.topicPartitions().stream()) + .filter(t -> topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic())) + .collect(Collectors.toSet()); + + + + final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove); + + if (resetOffsets) { + log.info("partitions to reset: {}", partitionsToReset); + if (!partitionsToReset.isEmpty()) { + try { + removeTopologyFuture.get(); Review comment: We shouldn't call `get()` on the futures, that's up to the user to do. Technically speaking the point of this isn't to make the APIs blocking, but to enable users to block on the completion of them -- ie, we want to return the `RemoveNamedTopologyResult` ASAP and allow the user to block on it, if they want ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -578,6 +577,7 @@ public void run() { failedStreamThreadSensor.record(); this.streamsUncaughtExceptionHandler.accept(e); } finally { + topologyMetadata.unregisterThread(threadMetadata.threadName()); Review comment: This should be moved to inside the `completeShutdown()` method, since we register the thread inside the StreamThread constructor but it's possible for the thread to be shut down before it ever starts running, in which case we'd still want to make sure it gets unregistered ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -108,6 +124,48 @@ private void unlock() { version.topologyLock.unlock(); } + public Collection<String> sourceTopologies(final String name) { Review comment: ```suggestion public Collection<String> sourceTopicsForTopology(final String name) { ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { /** * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running, - * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for + * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for * it to begin processing the new topology. * * @throws IllegalArgumentException if this topology name is already in use * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void addNamedTopology(final NamedTopology newTopology) { + public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) { + log.debug("Adding topology: {}", newTopology.name()); if (hasStartedOrFinishedShuttingDown()) { throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state); } else if (getTopologyByName(newTopology.name()).isPresent()) { throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() + " as another of the same name already exists"); } - topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); + return new AddNamedTopologyResult( + topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()) + ); } /** * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are - * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure + * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure * it stops processing the old topology. * + * @param topologyToRemove name of the topology to be removed + * @param resetOffsets whether to reset the committed offsets for any source topics + * * @throws IllegalArgumentException if this topology name cannot be found * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void removeNamedTopology(final String topologyToRemove) { + public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) { + log.debug("Removing topology: {}", topologyToRemove); if (!isRunningOrRebalancing()) { throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state); } else if (!getTopologyByName(topologyToRemove).isPresent()) { throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove); } + final Set<TopicPartition> partitionsToReset = metadataForLocalThreads() + .stream() + .flatMap(t -> { + final HashSet<TaskMetadata> tasks = new HashSet<>(); + tasks.addAll(t.activeTasks()); + tasks.addAll(t.standbyTasks()); Review comment: We don't need to do standbys since they don't commit offsets on the input topics (and imo we should not reset offsets on changelogs or repartitions, but leave it up to the user to clean those up how they see fit) ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -108,6 +124,48 @@ private void unlock() { version.topologyLock.unlock(); } + public Collection<String> sourceTopologies(final String name) { + return builders.get(name).sourceTopicCollection(); + } + + public boolean needsUpdate(final String threadName) { + return threadVersions.get(threadName) < topologyVersion(); + } + + public void registerThread(final String threadName) { + threadVersions.put(threadName, 0L); + } + + public void unregisterThread(final String threadName) { + threadVersions.remove(threadName); + } + + + public boolean reachedLatestVersion(final String threadName) { + boolean rebalance = false; + try { + lock(); + final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator(); + TopologyVersionWaiters topologyVersionWaiters; + threadVersions.put(threadName, topologyVersion()); + while (iterator.hasNext()) { + topologyVersionWaiters = iterator.next(); + final long verison = topologyVersionWaiters.topologyVersion; + if (verison <= threadVersions.get(threadName)) { + if (threadVersions.values().stream().allMatch(t -> t >= verison)) { + topologyVersionWaiters.future.complete(null); + iterator.remove(); + log.info("thread {} is now on on version {}", threadName, topologyVersionWaiters.topologyVersion); + rebalance = true; Review comment: This is a little tricky, but I think it might make sense to do the reverse. It has to do with how the thread handles being assigned tasks that it doesn't recognize because it hasn't yet updated its topology, which is to just make a note of the assigned task name and then stash it away until we see a topology update, during which we then check if the task was part of this new topology and if so we go on to create it. Which is a long way of saying, it's better to rebalance earlier than later, ie to rebalance when the _first_ thread processes a topology update rather than when the _last_ thread has processed the update. Partly so that the first thread can go ahead and start processing tasks from that topology right away, and partly because if we wait until the last thread acks the topology update then we could get in trouble and end up never rebalancing in extreme cases, for example if a thread gets stuck in processing (like if the user has bad custom logic that sends it into an infinite loop, which is a real example from an escalation I once had) Happy to chat over Zoom if you want to dig into this a bit more, I thought I had left a more specific TODO comment about when we do/don't want to rebalance but I just read it again and it's wildly unhelpful 😅 ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -61,6 +74,8 @@ public KafkaStreamsNamedTopologyWrapper(final Properties props, final KafkaClien private KafkaStreamsNamedTopologyWrapper(final StreamsConfig config, final KafkaClientSupplier clientSupplier) { super(new TopologyMetadata(new ConcurrentSkipListMap<>(), config), config, clientSupplier); + final LogContext logContext = new LogContext(); Review comment: I think you can just instantiate the field like this instead of going through the `LogContext`, I checked that class and it seems to do the same thing as just ``` private final Logger log = LoggerFactory.getLogger(KafkaStreamsNamedTopologyWrapper.class); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { /** * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running, - * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for + * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for * it to begin processing the new topology. * * @throws IllegalArgumentException if this topology name is already in use * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void addNamedTopology(final NamedTopology newTopology) { + public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) { + log.debug("Adding topology: {}", newTopology.name()); if (hasStartedOrFinishedShuttingDown()) { throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state); } else if (getTopologyByName(newTopology.name()).isPresent()) { throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() + " as another of the same name already exists"); } - topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); + return new AddNamedTopologyResult( + topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()) + ); } /** * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are - * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure + * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure * it stops processing the old topology. * + * @param topologyToRemove name of the topology to be removed + * @param resetOffsets whether to reset the committed offsets for any source topics + * * @throws IllegalArgumentException if this topology name cannot be found * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void removeNamedTopology(final String topologyToRemove) { + public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) { + log.debug("Removing topology: {}", topologyToRemove); if (!isRunningOrRebalancing()) { throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state); } else if (!getTopologyByName(topologyToRemove).isPresent()) { throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove); } + final Set<TopicPartition> partitionsToReset = metadataForLocalThreads() + .stream() + .flatMap(t -> { + final HashSet<TaskMetadata> tasks = new HashSet<>(); + tasks.addAll(t.activeTasks()); + tasks.addAll(t.standbyTasks()); + return tasks.stream(); + }) + .flatMap(t -> t.topicPartitions().stream()) + .filter(t -> topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic())) + .collect(Collectors.toSet()); + + + Review comment: nit: extra line breaks ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { /** * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running, - * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for + * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for * it to begin processing the new topology. * * @throws IllegalArgumentException if this topology name is already in use * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void addNamedTopology(final NamedTopology newTopology) { + public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) { + log.debug("Adding topology: {}", newTopology.name()); if (hasStartedOrFinishedShuttingDown()) { throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state); } else if (getTopologyByName(newTopology.name()).isPresent()) { throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() + " as another of the same name already exists"); } - topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); + return new AddNamedTopologyResult( + topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()) + ); } /** * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are - * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure + * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure * it stops processing the old topology. * + * @param topologyToRemove name of the topology to be removed + * @param resetOffsets whether to reset the committed offsets for any source topics + * * @throws IllegalArgumentException if this topology name cannot be found * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void removeNamedTopology(final String topologyToRemove) { + public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) { + log.debug("Removing topology: {}", topologyToRemove); if (!isRunningOrRebalancing()) { throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state); } else if (!getTopologyByName(topologyToRemove).isPresent()) { throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove); } + final Set<TopicPartition> partitionsToReset = metadataForLocalThreads() + .stream() + .flatMap(t -> { + final HashSet<TaskMetadata> tasks = new HashSet<>(); Review comment: super nit: ```suggestion final Set<TaskMetadata> tasks = new HashSet<>(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org