cmccabe commented on code in PR #13368:
URL: https://github.com/apache/kafka/pull/13368#discussion_r1137526482


##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -211,12 +214,38 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends 
MigrationClient with Lo
     }
   }
 
+  def migrateAcls(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): 
Unit = {
+    // This is probably fairly inefficient, but it preserves the semantics 
from AclAuthorizer (which is non-trivial)
+    var allAcls = new scala.collection.immutable.TreeMap[ResourcePattern, 
VersionedAcls]()(new ResourceOrdering)
+    def updateAcls(resourcePattern: ResourcePattern, versionedAcls: 
VersionedAcls): Unit = {
+      allAcls = allAcls.updated(resourcePattern, versionedAcls)
+    }
+
+    AclAuthorizer.loadAllAcls(zkClient, this, updateAcls)
+    allAcls.foreach { case (resourcePattern, versionedAcls) =>

Review Comment:
   The main issue that I see here is that batches could become too big. Could 
you add some code to limit batches to 100 records or so?
   
   We had a similar bug in snapshot generation until we decoupled batching from 
record generation. So we should probably do the same thing here.  In other 
words, recordConsumer should take individual records and do the batching 
itself. Since we'll be in a metadata transaction, the batching is not 
meaningful here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to