Serwios commented on code in PR #18130:
URL: https://github.com/apache/kafka/pull/18130#discussion_r1878925772


##########
clients/src/main/java/org/apache/kafka/server/authorizer/DefaultResourceTypeBasedAuthorization.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.kafka.server.authorizer;
+
+import org.apache.kafka.common.acl.*;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.SecurityUtils;
+
+import java.util.*;
+
+public class DefaultResourceTypeBasedAuthorization {
+    private DefaultResourceTypeBasedAuthorization() {}
+
+    public static AuthorizationResult authorize(
+            AuthorizableRequestContext requestContext,
+            AclOperation op,
+            ResourceType resourceType,
+            Authorizer authorizer
+    ) {
+        SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+        if (isSuperUserAuthorized(requestContext, op, resourceType, 
authorizer)) {
+            return AuthorizationResult.ALLOWED;
+        }
+
+        Map<PatternType, Set<String>> denyPatterns = initializePatternMap();
+        Map<PatternType, Set<String>> allowPatterns = initializePatternMap();
+
+        boolean hasWildcardAllow = false;
+        KafkaPrincipal principal = createKafkaPrincipal(requestContext);
+        String hostAddr = requestContext.clientAddress().getHostAddress();
+
+        for (AclBinding binding : 
authorizer.acls(createAclBindingFilter(resourceType))) {
+            if (!isValidBinding(binding, principal, hostAddr, op)) {
+                continue;
+            }
+
+            if (binding.entry().permissionType() == AclPermissionType.DENY) {
+                processDenyAcl(binding, denyPatterns);
+                continue;
+            }
+
+            if (binding.entry().permissionType() == AclPermissionType.ALLOW) {
+                processAllowAcl(binding, allowPatterns);
+                if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
+                    hasWildcardAllow = true;
+                }
+            }
+        }
+
+        if (hasWildcardAllow) {
+            return AuthorizationResult.ALLOWED;
+        }
+
+        return evaluateAuthorizationResult(allowPatterns, denyPatterns);
+    }
+
+    private static boolean isSuperUserAuthorized(AuthorizableRequestContext 
requestContext, AclOperation op, ResourceType resourceType, Authorizer 
authorizer) {
+        return authorizer.authorize(requestContext, 
Collections.singletonList(new Action(
+                        op, new ResourcePattern(resourceType, "hardcode", 
PatternType.LITERAL),
+                        0, true, false)))
+                .getFirst() == AuthorizationResult.ALLOWED;
+    }
+
+    private static Map<PatternType, Set<String>> initializePatternMap() {
+        return new EnumMap<>(PatternType.class) {{
+            put(PatternType.LITERAL, new HashSet<>());
+            put(PatternType.PREFIXED, new HashSet<>());
+        }};
+    }
+
+    private static KafkaPrincipal 
createKafkaPrincipal(AuthorizableRequestContext requestContext) {
+        return new KafkaPrincipal(
+                requestContext.principal().getPrincipalType(),
+                requestContext.principal().getName());
+    }
+
+    private static AclBindingFilter createAclBindingFilter(ResourceType 
resourceType) {
+        ResourcePatternFilter resourcePatternFilter = new 
ResourcePatternFilter(resourceType, null, PatternType.ANY);
+        return new AclBindingFilter(resourcePatternFilter, 
AccessControlEntryFilter.ANY);
+    }
+
+    private static boolean isValidBinding(AclBinding binding, KafkaPrincipal 
principal, String hostAddr, AclOperation op) {
+        return (binding.entry().host().equals(hostAddr) || 
binding.entry().host().equals("*"))
+                && 
(SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal)
 || binding.entry().principal().equals("User:*"))
+                && (binding.entry().operation() == op || 
binding.entry().operation() == AclOperation.ALL);

Review Comment:
   Thx, simplified in latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to