guozhangwang commented on a change in pull request #11479: URL: https://github.com/apache/kafka/pull/11479#discussion_r750795877
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -121,9 +164,9 @@ public void maybeWaitForNonEmptyTopology(final Supplier<State> threadState) { if (isEmpty() && threadState.get().isAlive()) { try { lock(); - while (isEmpty() && threadState.get().isAlive()) { Review comment: See my other comment: I think this function `maybeWaitForNonEmptyTopology` would not be needed any more since it's only value is the `version.topologyCV.await();` part, which we can just move up to the caller's while loop. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -908,20 +908,25 @@ private void initializeAndRestorePhase() { // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology private void checkForTopologyUpdates() { - if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) { - lastSeenTopologyVersion = topologyMetadata.topologyVersion(); - taskManager.handleTopologyUpdates(); - + do { topologyMetadata.maybeWaitForNonEmptyTopology(() -> state); + if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) { Review comment: As for the future: I think it's okay to let the first future be waiting a bit more if there are consecutive operations, i.e. in the above implementation we would complete all futures that have been registered so far when we eventually get every still-alive threads to catch up with the bumped version. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -908,20 +908,25 @@ private void initializeAndRestorePhase() { // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology private void checkForTopologyUpdates() { - if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) { - lastSeenTopologyVersion = topologyMetadata.topologyVersion(); - taskManager.handleTopologyUpdates(); - Review comment: For line 915 below: do we still need this line? Could we just inline the `Condition topologyCV` waiting within this while loop now? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -118,40 +134,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { /** * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running, - * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for + * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for * it to begin processing the new topology. * * @throws IllegalArgumentException if this topology name is already in use * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void addNamedTopology(final NamedTopology newTopology) { + public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) { + log.error("adding {}", newTopology.name()); if (hasStartedOrFinishedShuttingDown()) { throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state); } else if (getTopologyByName(newTopology.name()).isPresent()) { throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() + " as another of the same name already exists"); } - topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); + return new AddNamedTopologyResult( + topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()) + ); } /** * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are - * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure + * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure * it stops processing the old topology. * + * @param topologyToRemove name of the topology to be removed + * @param resetOffsets whether to reset the committed offsets for any source topics + * * @throws IllegalArgumentException if this topology name cannot be found * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void removeNamedTopology(final String topologyToRemove) { + public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) { + log.error("Removing {}", topologyToRemove); if (!isRunningOrRebalancing()) { throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state); } else if (!getTopologyByName(topologyToRemove).isPresent()) { throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove); } + final Set<TopicPartition> partitionsToReset = metadataForLocalThreads() + .stream() + .flatMap(t -> { + final HashSet<TaskMetadata> tasks = new HashSet<>(); + tasks.addAll(t.activeTasks()); + tasks.addAll(t.standbyTasks()); + return tasks.stream(); + }) + .flatMap(t -> t.topicPartitions().stream()) +// .filter(t -> topologyMetadata.sourceTopicCollection().contains(t)) Review comment: Intentional? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -118,40 +134,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { /** * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running, - * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for + * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for * it to begin processing the new topology. * * @throws IllegalArgumentException if this topology name is already in use * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void addNamedTopology(final NamedTopology newTopology) { + public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) { + log.error("adding {}", newTopology.name()); if (hasStartedOrFinishedShuttingDown()) { throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state); } else if (getTopologyByName(newTopology.name()).isPresent()) { throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() + " as another of the same name already exists"); } - topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); + return new AddNamedTopologyResult( + topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()) + ); } /** * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are - * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure + * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure * it stops processing the old topology. * + * @param topologyToRemove name of the topology to be removed + * @param resetOffsets whether to reset the committed offsets for any source topics + * * @throws IllegalArgumentException if this topology name cannot be found * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void removeNamedTopology(final String topologyToRemove) { + public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) { + log.error("Removing {}", topologyToRemove); if (!isRunningOrRebalancing()) { throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state); } else if (!getTopologyByName(topologyToRemove).isPresent()) { throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove); } + final Set<TopicPartition> partitionsToReset = metadataForLocalThreads() + .stream() + .flatMap(t -> { + final HashSet<TaskMetadata> tasks = new HashSet<>(); + tasks.addAll(t.activeTasks()); + tasks.addAll(t.standbyTasks()); + return tasks.stream(); + }) + .flatMap(t -> t.topicPartitions().stream()) +// .filter(t -> topologyMetadata.sourceTopicCollection().contains(t)) Review comment: Also this logic seems would just include all partitions of all tasks, since it does not involve the `topologyToRemove` at all? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -83,6 +100,7 @@ public TopologyMetadata(final InternalTopologyBuilder builder, } else { builders.put(UNNAMED_TOPOLOGY, builder); } + getStreamThreadCount = () -> getNumStreamThreads(config); Review comment: Why we need to initialize this supplier in the constructor (ditto below)? This variable is only called in `reachedVersion` when stream threads have been initialized already. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -908,20 +908,25 @@ private void initializeAndRestorePhase() { // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology private void checkForTopologyUpdates() { - if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) { - lastSeenTopologyVersion = topologyMetadata.topologyVersion(); - taskManager.handleTopologyUpdates(); - + do { topologyMetadata.maybeWaitForNonEmptyTopology(() -> state); + if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) { Review comment: I feel this is getting more complicated than necessary to do KAFKA-12648: we have a `topologyMetadata` at the `KafkaStreams` layer while at the `StreamThread` layer we keep a `long lastSeenTopologyVersion`. If we let the `topologyMetadata` object which is shared among all threads (as well as their task managers etc) then we can simplify this. Just a sketchy thought here: * In `TopologyMetadata` we maintain a `Map<String, Long>` from thread name to thread's current topology version. When new threads are added / threads are removed, this map would be updated as well (in synchronized way). * Inside a thread's `checkForTopologyUpdate`, in a synchronized block check if all threads' versions except this thread is equal to the current version: if yes, this thread would update its corresponding map entry as well and then trigger rebalance; otherwise, only update the corresponding map. So if consecutive topology updates are being issued, then we would only trigger a rebalance at the end when all threads reaches the end topology version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org