rajinisivaram commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r526479473
########## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ########## @@ -139,4 +150,129 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable<AclBinding> acls(AclBindingFilter filter); + + /** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. Review comment: We should document what this default implementation does and why a custom implementation may want to override this default. ########## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ########## @@ -139,4 +150,129 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable<AclBinding> acls(AclBindingFilter filter); + + /** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. Review comment: We should document what this default implementation does and why a custom implementation may want to override this default. ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -307,6 +312,111 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message Review comment: We should have exactly one call to `logAuditMessage` that says whether access was allowed or denied. ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -307,6 +312,111 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + if (resourceType eq ResourceType.ANY) + throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType") + + if (resourceType eq ResourceType.UNKNOWN) + throw new IllegalArgumentException("Unknown resource type") + + if (op eq AclOperation.ANY) + throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType") + + if (op eq AclOperation.UNKNOWN) + throw new IllegalArgumentException("Unknown operation type") + + val allowPatterns = matchingPatterns( + requestContext.principal().toString, + requestContext.clientAddress().getHostAddress, + op, + resourceType, + AclPermissionType.ALLOW + ) + + val denyPatterns = matchingPatterns( + requestContext.principal().toString, + requestContext.clientAddress().getHostAddress, + op, + resourceType, + AclPermissionType.DENY Review comment: We should optimize for the case where there are no DENY acls. There is no point in finding all matching ALLOW entries in that case, we would just need to check for one ALLOW. ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ########## @@ -988,6 +1000,369 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { } } + @Test + def testAuthorizeAnyDurability(): Unit = { Review comment: As before, references to Durability in authorizer tests are confusing. ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.java ########## @@ -0,0 +1,4 @@ +package unit.kafka.security.authorizer; + +public class AuthorizerInterfaceDefaultTest { Review comment: Are we going to add tests here? ########## File path: core/src/test/scala/unit/kafka/security/authorizer/MockAuthorizer.scala ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.{lang, util} +import java.util.concurrent.CompletionStage + +import org.apache.kafka.common.Endpoint +import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} +import org.apache.kafka.server.authorizer.{AclCreateResult, AclDeleteResult, Action, AuthorizableRequestContext, AuthorizationResult, Authorizer, AuthorizerServerInfo} + +object MockAuthorizer { + val authorizer = new AclAuthorizer +} + +/** + * A mock authorizer for testing the interface default + */ +class MockAuthorizer extends Authorizer { Review comment: We could mock this fully instead of using AclAuthorizer? ########## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ########## @@ -139,4 +150,129 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable<AclBinding> acls(AclBindingFilter filter); + + /** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ + default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { + if (resourceType == ResourceType.ANY) { + throw new IllegalArgumentException( + "Must specify a non-filter resource type for authorizeByResourceType"); + } + + if (resourceType == ResourceType.UNKNOWN) { + throw new IllegalArgumentException( + "Unknown resource type"); + } + + if (op == AclOperation.ANY) { + throw new IllegalArgumentException( + "Must specify a non-filter operation type for authorizeByResourceType"); + } + + if (op == AclOperation.UNKNOWN) { + throw new IllegalArgumentException( + "Unknown operation type"); + } + + ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( + resourceType, null, PatternType.ANY); + AclBindingFilter aclFilter = new AclBindingFilter( + resourceTypeFilter, AccessControlEntryFilter.ANY); + + Set<String> denyLiterals = new HashSet<>(); + Set<String> denyPrefixes = new HashSet<>(); + Set<String> allowLiterals = new HashSet<>(); + Set<String> allowPrefixes = new HashSet<>(); + boolean hasWildCardAllow = false; + + for (AclBinding binding : acls(aclFilter)) { + if (!binding.entry().host().equals(requestContext.clientAddress().getHostAddress()) + && !binding.entry().host().equals("*")) + continue; + + if (!binding.entry().principal().equals(requestContext.principal().toString()) Review comment: request.principal can be a custom extension of KafkaPrincipal, we cannot use toString for comparison ########## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ########## @@ -139,4 +150,129 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable<AclBinding> acls(AclBindingFilter filter); + + /** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ + default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { + if (resourceType == ResourceType.ANY) { + throw new IllegalArgumentException( + "Must specify a non-filter resource type for authorizeByResourceType"); + } + + if (resourceType == ResourceType.UNKNOWN) { + throw new IllegalArgumentException( + "Unknown resource type"); + } + + if (op == AclOperation.ANY) { + throw new IllegalArgumentException( + "Must specify a non-filter operation type for authorizeByResourceType"); + } + + if (op == AclOperation.UNKNOWN) { + throw new IllegalArgumentException( + "Unknown operation type"); + } + + ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( + resourceType, null, PatternType.ANY); + AclBindingFilter aclFilter = new AclBindingFilter( + resourceTypeFilter, AccessControlEntryFilter.ANY); + + Set<String> denyLiterals = new HashSet<>(); + Set<String> denyPrefixes = new HashSet<>(); + Set<String> allowLiterals = new HashSet<>(); + Set<String> allowPrefixes = new HashSet<>(); + boolean hasWildCardAllow = false; + + for (AclBinding binding : acls(aclFilter)) { + if (!binding.entry().host().equals(requestContext.clientAddress().getHostAddress()) + && !binding.entry().host().equals("*")) + continue; + + if (!binding.entry().principal().equals(requestContext.principal().toString()) + && !binding.entry().principal().equals("User:*")) + continue; + + if (binding.entry().operation() != op + && binding.entry().operation() != AclOperation.ALL) + continue; + + if (binding.entry().permissionType() == AclPermissionType.DENY) { + switch (binding.pattern().patternType()) { + case LITERAL: + if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) + return AuthorizationResult.DENIED; + denyLiterals.add(binding.pattern().name()); Review comment: nit: indentation ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -307,6 +312,111 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + if (resourceType eq ResourceType.ANY) + throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType") + + if (resourceType eq ResourceType.UNKNOWN) + throw new IllegalArgumentException("Unknown resource type") + + if (op eq AclOperation.ANY) + throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType") + + if (op eq AclOperation.UNKNOWN) + throw new IllegalArgumentException("Unknown operation type") + + val allowPatterns = matchingPatterns( + requestContext.principal().toString, Review comment: Same as in the Authorizer default method, we cannot use request.principal().toString() ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -130,6 +130,11 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + @volatile + private var resourceCache = new scala.collection.immutable.HashMap[AccessControlEntry, + scala.collection.mutable.HashSet[ResourcePattern]]() Review comment: Not sure it is worth making a whole copy of this structure for a method that is not used frequently. It will be good to add microbenchmarks to `AclAuthorizerBenchmark` to understand how the new method performs. ########## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ########## @@ -139,4 +150,129 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable<AclBinding> acls(AclBindingFilter filter); + + /** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ + default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { + if (resourceType == ResourceType.ANY) { + throw new IllegalArgumentException( + "Must specify a non-filter resource type for authorizeByResourceType"); + } + + if (resourceType == ResourceType.UNKNOWN) { + throw new IllegalArgumentException( + "Unknown resource type"); + } + + if (op == AclOperation.ANY) { + throw new IllegalArgumentException( + "Must specify a non-filter operation type for authorizeByResourceType"); + } + + if (op == AclOperation.UNKNOWN) { + throw new IllegalArgumentException( + "Unknown operation type"); + } + + ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( + resourceType, null, PatternType.ANY); + AclBindingFilter aclFilter = new AclBindingFilter( + resourceTypeFilter, AccessControlEntryFilter.ANY); + + Set<String> denyLiterals = new HashSet<>(); + Set<String> denyPrefixes = new HashSet<>(); + Set<String> allowLiterals = new HashSet<>(); + Set<String> allowPrefixes = new HashSet<>(); + boolean hasWildCardAllow = false; + + for (AclBinding binding : acls(aclFilter)) { + if (!binding.entry().host().equals(requestContext.clientAddress().getHostAddress()) + && !binding.entry().host().equals("*")) + continue; + + if (!binding.entry().principal().equals(requestContext.principal().toString()) + && !binding.entry().principal().equals("User:*")) + continue; + + if (binding.entry().operation() != op + && binding.entry().operation() != AclOperation.ALL) + continue; + + if (binding.entry().permissionType() == AclPermissionType.DENY) { + switch (binding.pattern().patternType()) { + case LITERAL: + if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) + return AuthorizationResult.DENIED; + denyLiterals.add(binding.pattern().name()); + break; + case PREFIXED: + denyPrefixes.add(binding.pattern().name()); + break; + } + continue; + } + + if (binding.entry().permissionType() != AclPermissionType.ALLOW) + continue; + + switch (binding.pattern().patternType()) { + case LITERAL: + if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) { + hasWildCardAllow = true; + continue; + } + allowLiterals.add(binding.pattern().name()); + break; + case PREFIXED: + allowPrefixes.add(binding.pattern().name()); + break; + } + } + + if (hasWildCardAllow) { + return AuthorizationResult.ALLOWED; + } + + for (String allowPrefix : allowPrefixes) { + StringBuilder sb = new StringBuilder(); + boolean hasDominatedDeny = false; + for (char ch : allowPrefix.toCharArray()) { + sb.append(ch); + if (denyPrefixes.contains(sb.toString())) { + hasDominatedDeny = true; + break; + } + } + if (!hasDominatedDeny) + return AuthorizationResult.ALLOWED; + } + + for (String allowLiteral : allowLiterals) { + if (denyLiterals.contains(allowLiteral)) + continue; + StringBuilder sb = new StringBuilder(); + boolean hasDominatedDeny = false; + for (char ch : allowLiteral.toCharArray()) { Review comment: This looks identical to the code block above for prefix, we could just run the same code in a loop that checks both allow literals and prefixes. ########## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ########## @@ -175,4 +179,69 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A override def close(): Unit = { baseAuthorizer.close() } + + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + if (resourceType == ResourceType.ANY) + throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType") + + if (resourceType == ResourceType.UNKNOWN) + throw new IllegalArgumentException("Unknown resource type") + + if (op == AclOperation.ANY) + throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType") + + if (op == AclOperation.UNKNOWN) + throw new IllegalArgumentException("Unknown operation type") + + if (shouldAllowEveryoneIfNoAclIsFound && !denyAllResource(requestContext, op, resourceType)) { + AuthorizationResult.ALLOWED + } else { + super.authorizeByResourceType(requestContext, op, resourceType) + } + } + + def denyAllResource(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): Boolean = { + val resourceTypeFilter = new ResourcePatternFilter( + resourceType, null, PatternType.ANY) + val accessControlEntry = new AccessControlEntryFilter( + null, null, null, AclPermissionType.DENY) + val aclFilter = new AclBindingFilter(resourceTypeFilter, accessControlEntry) + + for (binding <- acls(aclFilter).asScala) { + if (aceMatched(requestContext, op, binding) && canDenyAll(binding.pattern())) + return true + } + false + } + + def aceMatched(requestContext: AuthorizableRequestContext, + op: AclOperation, + binding: AclBinding): Boolean = { + (hostMatched(requestContext, binding) && principleMatched(requestContext, binding) Review comment: We coul just inline all the methods below instead of separate methods for host etc.? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -517,7 +517,9 @@ class KafkaApis(val requestChannel: RequestChannel, } // Note that authorization to a transactionalId implies ProducerId authorization - } else if (produceRequest.hasIdempotentRecords && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { + } else if (produceRequest.hasIdempotentRecords + && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME) + && !authorizeByResourceType(request.context, WRITE, TOPIC)) { Review comment: Hmm, Produce s authorized for topic anyway. Why would we use a very expensive authorizeByResourceType here? ########## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ########## @@ -1775,6 +1782,87 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertFalse("Cluster id not returned", response.clusterId.isEmpty) } + @Test + def testAuthorizeAnyDurability(): Unit = { Review comment: `Durability`? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2034,7 +2036,8 @@ class KafkaApis(val requestChannel: RequestChannel, sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) return } - } else if (!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { + } else if (!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME) Review comment: First authorize should use `logIfAllowed=true`, `logIfDenied=false` ########## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ########## @@ -175,4 +179,69 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A override def close(): Unit = { baseAuthorizer.close() } + + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + if (resourceType == ResourceType.ANY) + throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType") + + if (resourceType == ResourceType.UNKNOWN) + throw new IllegalArgumentException("Unknown resource type") + + if (op == AclOperation.ANY) + throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType") + + if (op == AclOperation.UNKNOWN) + throw new IllegalArgumentException("Unknown operation type") + + if (shouldAllowEveryoneIfNoAclIsFound && !denyAllResource(requestContext, op, resourceType)) { + AuthorizationResult.ALLOWED + } else { + super.authorizeByResourceType(requestContext, op, resourceType) + } + } + + def denyAllResource(requestContext: AuthorizableRequestContext, Review comment: make this all the methods below `private` ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ########## @@ -82,16 +86,17 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { override def setUp(): Unit = { super.setUp() + val authorizers = Seq(aclAuthorizer, aclAuthorizer2, MockAuthorizer.authorizer) Review comment: It may be better to put the mock tests into another test class. That wouldn't request ZooKeeper for example. ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -550,6 +660,31 @@ class AclAuthorizer extends Authorizer with Logging { } private def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match { Review comment: We should run the microbenchmarks in AclAuthorizerBenchmark to make sure we don't add too much overhead here. ########## File path: core/src/test/scala/unit/kafka/security/authorizer/MockAuthorizer.scala ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.{lang, util} +import java.util.concurrent.CompletionStage + +import org.apache.kafka.common.Endpoint +import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} +import org.apache.kafka.server.authorizer.{AclCreateResult, AclDeleteResult, Action, AuthorizableRequestContext, AuthorizationResult, Authorizer, AuthorizerServerInfo} + +object MockAuthorizer { + val authorizer = new AclAuthorizer Review comment: Don't we reuse this in multiple tests? How do we guarantee that no state is preserved between tests? ########## File path: core/src/test/scala/unit/kafka/security/authorizer/MockAuthorizer.scala ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.{lang, util} +import java.util.concurrent.CompletionStage + +import org.apache.kafka.common.Endpoint +import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} +import org.apache.kafka.server.authorizer.{AclCreateResult, AclDeleteResult, Action, AuthorizableRequestContext, AuthorizationResult, Authorizer, AuthorizerServerInfo} + +object MockAuthorizer { + val authorizer = new AclAuthorizer +} + +/** + * A mock authorizer for testing the interface default + */ +class MockAuthorizer extends Authorizer { Review comment: We could mock this fully instead of using AclAuthorizer? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org