cmccabe commented on code in PR #13606:
URL: https://github.com/apache/kafka/pull/13606#discussion_r1170454667


##########
core/src/main/scala/kafka/migration/MigrationPropagator.scala:
##########
@@ -70,14 +70,24 @@ class MigrationPropagator(
 
   override def publishMetadata(image: MetadataImage): Unit = {
     val oldImage = _image
-    val addedBrokers = new 
util.HashSet[Integer](image.cluster().brokers().keySet())
-    addedBrokers.removeAll(oldImage.cluster().brokers().keySet())
-    val removedBrokers = new 
util.HashSet[Integer](oldImage.cluster().brokers().keySet())
-    removedBrokers.removeAll(image.cluster().brokers().keySet())
-
-    removedBrokers.asScala.foreach(id => channelManager.removeBroker(id))
-    addedBrokers.asScala.foreach(id =>
-      
channelManager.addBroker(Broker.fromBrokerRegistration(image.cluster().broker(id))))
+    val prevBrokers = oldImage.cluster().brokers().values().asScala
+      .filter(_.isMigratingZkBroker)
+      .filterNot(_.fenced)
+      .map(Broker.fromBrokerRegistration)
+      .toSet
+
+    val aliveBrokers = image.cluster().brokers().values().asScala
+      .filter(_.isMigratingZkBroker)
+      .filterNot(_.fenced)
+      .map(Broker.fromBrokerRegistration)
+      .toSet
+
+    val addedBrokers = aliveBrokers -- prevBrokers
+    val removedBrokers = prevBrokers -- aliveBrokers
+
+    stateChangeLogger.logger.debug(s"Adding brokers $addedBrokers, removing 
brokers $removedBrokers.")

Review Comment:
   can we make this INFO and only do it if addedBrokers or removedBrokers is 
non-empty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to