cmccabe commented on code in PR #13368: URL: https://github.com/apache/kafka/pull/13368#discussion_r1137526482
########## core/src/main/scala/kafka/zk/ZkMigrationClient.scala: ########## @@ -211,12 +214,38 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo } } + def migrateAcls(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = { + // This is probably fairly inefficient, but it preserves the semantics from AclAuthorizer (which is non-trivial) + var allAcls = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + def updateAcls(resourcePattern: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + allAcls = allAcls.updated(resourcePattern, versionedAcls) + } + + AclAuthorizer.loadAllAcls(zkClient, this, updateAcls) + allAcls.foreach { case (resourcePattern, versionedAcls) => Review Comment: The main issue that I see here is that batches could become too big. Could you add some code to limit batches to 100 records or so? We had a similar bug in snapshot generation until we decoupled batching from record generation. So we should probably do the same thing here. In other words, recordConsumer should take individual records and do the batching itself. Since we'll be in a metadata transaction, the batching is not meaningful here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org