pprovenzano commented on code in PR #13628:
URL: https://github.com/apache/kafka/pull/13628#discussion_r1179574482


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -568,19 +571,58 @@ public void run() throws Exception {
                     });
                 }
 
-                // For configs and client quotas, we need to send all of the 
data to the ZK client since we persist
-                // everything for a given entity in a single ZK node.
+                // For configs and client quotas, we need to send all of the 
data to the ZK
+                // client since we persist everything for a given entity in a 
single ZK node.
                 if (delta.configsDelta() != null) {
                     delta.configsDelta().changes().forEach((configResource, 
configDelta) ->
                         apply("Updating config resource " + configResource, 
migrationState ->
                             zkMigrationClient.writeConfigs(configResource, 
image.configs().configMapForResource(configResource), migrationState)));
                 }
 
-                if (delta.clientQuotasDelta() != null) {
-                    
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, 
clientQuotaDelta) -> {
-                        Map<String, Double> quotaMap = 
image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        apply("Updating client quota " + clientQuotaEntity, 
migrationState ->
-                            
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, 
migrationState));
+                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() 
!= null)) {
+
+                    // A list of users with scram or quota changes
+                    HashSet<String> users = new HashSet<String>();
+
+                    // Populate list with users with scram changes
+                    if (delta.scramDelta() != null) {
+                        delta.scramDelta().changes().forEach((scramMechanism, 
changes) -> {
+                            changes.forEach((userName, changeOpt) -> 
users.add(userName));
+                        });
+                    }
+
+                    // Populate list with users with quota changes 
+                    // and apply quota changes to all non user quota changes
+                    if (delta.clientQuotasDelta() != null) {
+                        Map<String, String> scramMap = new HashMap<String, 
String>();
+                        
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, 
clientQuotaDelta) -> {
+
+                            if 
((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                                
(!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {
+                                String userName = 
clientQuotaEntity.entries().get(ClientQuotaEntity.USER);
+                                // Add clientQuotaEntity to list to process at 
the end
+                                users.add(userName);
+                            } else {
+                                Map<String, Double> quotaMap = 
image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+                                apply("Updating client quota " + 
clientQuotaEntity, migrationState -> 
+                                    
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, 
scramMap, migrationState));
+                            }
+                        });
+                    }
+                    // Updateuser scram and quota data for each user with 
changes in either.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to