rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1114673066


##########
metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java:
##########
@@ -126,29 +127,27 @@ public void handleMetadataVersionChange(MetadataVersion 
newVersion) {
     }
 
     public TopicsImage apply() {
-        Map<Uuid, TopicImage> newTopicsById = new 
HashMap<>(image.topicsById().size());
-        Map<String, TopicImage> newTopicsByName = new 
HashMap<>(image.topicsByName().size());
-        for (Entry<Uuid, TopicImage> entry : image.topicsById().entrySet()) {
-            Uuid id = entry.getKey();
-            TopicImage prevTopicImage = entry.getValue();
-            TopicDelta delta = changedTopics.get(id);
-            if (delta == null) {
-                if (!deletedTopicIds.contains(id)) {
-                    newTopicsById.put(id, prevTopicImage);
-                    newTopicsByName.put(prevTopicImage.name(), prevTopicImage);
-                }
+        ImMap<Uuid, TopicImage> newTopicsById = image.topicsById;
+        ImMap<String, TopicImage> newTopicsByName = image.topicsByName;
+        // apply all the deletes
+        for (Uuid topicId: deletedTopicIds) {
+            // it was deleted, so we have to remove it from the maps
+            TopicImage originalTopicToBeDeleted = 
image.topicsById.get(topicId);
+            if (originalTopicToBeDeleted == null) {
+                throw new IllegalStateException("Missing topic id " + topicId);
             } else {
-                TopicImage newTopicImage = delta.apply();
-                newTopicsById.put(id, newTopicImage);
-                newTopicsByName.put(delta.name(), newTopicImage);
+                newTopicsById = newTopicsById.without(topicId);
+                newTopicsByName = 
newTopicsByName.without(originalTopicToBeDeleted.name());
             }
         }
-        for (Entry<Uuid, TopicDelta> entry : changedTopics.entrySet()) {
-            if (!newTopicsById.containsKey(entry.getKey())) {
-                TopicImage newTopicImage = entry.getValue().apply();
-                newTopicsById.put(newTopicImage.id(), newTopicImage);
-                newTopicsByName.put(newTopicImage.name(), newTopicImage);
-            }
+        // apply all the updates/additions
+        for (Map.Entry<Uuid, TopicDelta> entry: changedTopics.entrySet()) {
+            Uuid topicId = entry.getKey();
+            TopicImage newTopicToBeAddedOrUpdated = entry.getValue().apply();
+            // put new information into the maps
+            String topicName = newTopicToBeAddedOrUpdated.name();
+            newTopicsById = newTopicsById.assoc(topicId, 
newTopicToBeAddedOrUpdated);
+            newTopicsByName = newTopicsByName.assoc(topicName, 
newTopicToBeAddedOrUpdated);

Review Comment:
   Persistent data structures are immutable, so they always create and return a 
new data structure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to