junrao commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586990075
##########
File path:
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -349,6 +359,29 @@ public void replay(PartitionChangeRecord record) {
log.debug("Applied ISR change record: {}", record.toString());
}
+ public void replay(RemoveTopicRecord record) {
+ // Remove this topic from the topics map and the topicsByName map.
+ TopicControlInfo topic = topics.remove(record.topicId());
+ if (topic == null) {
+ throw new RuntimeException("Can't find topic with ID " +
record.topicId() +
+ " to remove.");
+ }
+ topicsByName.remove(topic.name);
+
+ // Delete the configurations associated with this topic.
+ configurationControl.deleteTopicConfigs(topic.name);
+
+ // Remove the entries for this topic in brokersToIsrs.
+ for (PartitionControlInfo partition : topic.parts.values()) {
+ for (int i = 0; i < partition.isr.length; i++) {
+ brokersToIsrs.removeTopicEntryForBroker(topic.id,
partition.isr[i]);
+ }
+ }
+ brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
Review comment:
It's true and a partition could have isr and no leader. However, in that
case, `isrMembers` in brokersToIsrs will still be updated with key from
replicaId in isr and isr will never have -1 in its list. The noLeader info is
only stored in the value of `isrMembers`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]