[ https://issues.apache.org/jira/browse/KAFKA-7011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505503#comment-16505503 ]
ASF GitHub Bot commented on KAFKA-7011: --------------------------------------- junrao closed pull request #5160: KAFKA-7011 - Remove ResourceNameType field from Java Resource class. URL: https://github.com/apache/kafka/pull/5160 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java index d264ef1a4f6..feba875ea62 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java @@ -19,48 +19,60 @@ import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.resource.Resource; +import org.apache.kafka.common.resource.ResourceNameType; +import org.apache.kafka.common.resource.ResourcePattern; import java.util.Objects; /** - * Represents a binding between a resource and an access control entry. + * Represents a binding between a resource pattern and an access control entry. * * The API for this class is still evolving and we may break compatibility in minor releases, if necessary. */ @InterfaceStability.Evolving public class AclBinding { - private final Resource resource; + private final ResourcePattern pattern; private final AccessControlEntry entry; + /** + * Create an instance of this class with the provided parameters. + * + * @param pattern non-null resource pattern. + * @param entry non-null entry + */ + public AclBinding(ResourcePattern pattern, AccessControlEntry entry) { + this.pattern = Objects.requireNonNull(pattern, "pattern"); + this.entry = Objects.requireNonNull(entry, "entry"); + } + /** * Create an instance of this class with the provided parameters. * * @param resource non-null resource * @param entry non-null entry + * @deprecated Since 2.0. Use {@link #AclBinding(ResourcePattern, AccessControlEntry)} */ + @Deprecated public AclBinding(Resource resource, AccessControlEntry entry) { - Objects.requireNonNull(resource); - this.resource = resource; - Objects.requireNonNull(entry); - this.entry = entry; + this(new ResourcePattern(resource.resourceType(), resource.name(), ResourceNameType.LITERAL), entry); } /** - * Return true if this binding has any UNKNOWN components. + * @return true if this binding has any UNKNOWN components. */ public boolean isUnknown() { - return resource.isUnknown() || entry.isUnknown(); + return pattern.isUnknown() || entry.isUnknown(); } /** - * Return the resource for this binding. + * @return the resource pattern for this binding. */ - public Resource resource() { - return resource; + public ResourcePattern pattern() { + return pattern; } /** - * Return the access control entry for this binding. + * @return the access control entry for this binding. */ public final AccessControlEntry entry() { return entry; @@ -70,24 +82,25 @@ public final AccessControlEntry entry() { * Create a filter which matches only this AclBinding. */ public AclBindingFilter toFilter() { - return new AclBindingFilter(resource.toFilter(), entry.toFilter()); + return new AclBindingFilter(pattern.toFilter(), entry.toFilter()); } @Override public String toString() { - return "(resource=" + resource + ", entry=" + entry + ")"; + return "(pattern=" + pattern + ", entry=" + entry + ")"; } @Override public boolean equals(Object o) { - if (!(o instanceof AclBinding)) - return false; - AclBinding other = (AclBinding) o; - return resource.equals(other.resource) && entry.equals(other.entry); + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AclBinding that = (AclBinding) o; + return Objects.equals(pattern, that.pattern) && + Objects.equals(entry, that.entry); } @Override public int hashCode() { - return Objects.hash(resource, entry); + return Objects.hash(pattern, entry); } } diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java index 5841b5aeb9a..9d13fa7ae04 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java @@ -19,6 +19,8 @@ import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.resource.ResourceFilter; +import org.apache.kafka.common.resource.ResourceNameType; +import org.apache.kafka.common.resource.ResourcePatternFilter; import java.util.Objects; @@ -29,43 +31,53 @@ */ @InterfaceStability.Evolving public class AclBindingFilter { - private final ResourceFilter resourceFilter; + private final ResourcePatternFilter patternFilter; private final AccessControlEntryFilter entryFilter; /** * A filter which matches any ACL binding. */ - public static final AclBindingFilter ANY = new AclBindingFilter(ResourceFilter.ANY, AccessControlEntryFilter.ANY); + public static final AclBindingFilter ANY = new AclBindingFilter(ResourcePatternFilter.ANY, AccessControlEntryFilter.ANY); + + /** + * Create an instance of this filter with the provided parameters. + * + * @param patternFilter non-null pattern filter + * @param entryFilter non-null access control entry filter + */ + public AclBindingFilter(ResourcePatternFilter patternFilter, AccessControlEntryFilter entryFilter) { + this.patternFilter = Objects.requireNonNull(patternFilter, "patternFilter"); + this.entryFilter = Objects.requireNonNull(entryFilter, "entryFilter"); + } /** * Create an instance of this filter with the provided parameters. * * @param resourceFilter non-null resource filter * @param entryFilter non-null access control entry filter + * @deprecated Since 2.0. Use {@link #AclBindingFilter(ResourcePatternFilter, AccessControlEntryFilter)} */ + @Deprecated public AclBindingFilter(ResourceFilter resourceFilter, AccessControlEntryFilter entryFilter) { - Objects.requireNonNull(resourceFilter); - this.resourceFilter = resourceFilter; - Objects.requireNonNull(entryFilter); - this.entryFilter = entryFilter; + this(new ResourcePatternFilter(resourceFilter.resourceType(), resourceFilter.name(), ResourceNameType.LITERAL), entryFilter); } /** - * Return true if this filter has any UNKNOWN components. + * @return {@code true} if this filter has any UNKNOWN components. */ public boolean isUnknown() { - return resourceFilter.isUnknown() || entryFilter.isUnknown(); + return patternFilter.isUnknown() || entryFilter.isUnknown(); } /** - * Return the resource filter. + * @return the resource pattern filter. */ - public ResourceFilter resourceFilter() { - return resourceFilter; + public ResourcePatternFilter patternFilter() { + return patternFilter; } /** - * Return the access control entry filter. + * @return the access control entry filter. */ public final AccessControlEntryFilter entryFilter() { return entryFilter; @@ -73,15 +85,16 @@ public final AccessControlEntryFilter entryFilter() { @Override public String toString() { - return "(resourceFilter=" + resourceFilter + ", entryFilter=" + entryFilter + ")"; + return "(patternFilter=" + patternFilter + ", entryFilter=" + entryFilter + ")"; } @Override public boolean equals(Object o) { - if (!(o instanceof AclBindingFilter)) - return false; - AclBindingFilter other = (AclBindingFilter) o; - return resourceFilter.equals(other.resourceFilter) && entryFilter.equals(other.entryFilter); + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AclBindingFilter that = (AclBindingFilter) o; + return Objects.equals(patternFilter, that.patternFilter) && + Objects.equals(entryFilter, that.entryFilter); } /** @@ -89,14 +102,14 @@ public boolean equals(Object o) { * there are no ANY or UNKNOWN fields. */ public boolean matchesAtMostOne() { - return resourceFilter.matchesAtMostOne() && entryFilter.matchesAtMostOne(); + return patternFilter.matchesAtMostOne() && entryFilter.matchesAtMostOne(); } /** * Return a string describing an ANY or UNKNOWN field, or null if there is no such field. */ public String findIndefiniteField() { - String indefinite = resourceFilter.findIndefiniteField(); + String indefinite = patternFilter.findIndefiniteField(); if (indefinite != null) return indefinite; return entryFilter.findIndefiniteField(); @@ -106,11 +119,11 @@ public String findIndefiniteField() { * Return true if the resource filter matches the binding's resource and the entry filter matches binding's entry. */ public boolean matches(AclBinding binding) { - return resourceFilter.matches(binding.resource()) && entryFilter.matches(binding.entry()); + return patternFilter.matches(binding.pattern()) && entryFilter.matches(binding.entry()); } @Override public int hashCode() { - return Objects.hash(resourceFilter, entryFilter); + return Objects.hash(patternFilter, entryFilter); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java index 00f65c6ca1c..6782f7820ce 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java @@ -19,13 +19,13 @@ import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.resource.ResourceNameType; import org.apache.kafka.common.utils.Utils; @@ -81,9 +81,9 @@ public AclCreation(AclBinding acl) { } static AclCreation fromStruct(Struct struct) { - Resource resource = RequestUtils.resourceFromStructFields(struct); + ResourcePattern pattern = RequestUtils.resourcePatternromStructFields(struct); AccessControlEntry entry = RequestUtils.aceFromStructFields(struct); - return new AclCreation(new AclBinding(resource, entry)); + return new AclCreation(new AclBinding(pattern, entry)); } public AclBinding acl() { @@ -91,7 +91,7 @@ public AclBinding acl() { } void setStructFields(Struct struct) { - RequestUtils.resourceSetStructFields(acl.resource(), struct); + RequestUtils.resourcePatternSetStructFields(acl.pattern(), struct); RequestUtils.aceSetStructFields(acl.entry(), struct); } @@ -179,12 +179,19 @@ private void validate(List<AclCreation> aclCreations) { if (version() == 0) { final boolean unsupported = aclCreations.stream() .map(AclCreation::acl) - .map(AclBinding::resource) - .map(Resource::nameType) + .map(AclBinding::pattern) + .map(ResourcePattern::nameType) .anyMatch(nameType -> nameType != ResourceNameType.LITERAL); if (unsupported) { throw new UnsupportedVersionException("Version 0 only supports literal resource name types"); } } + + final boolean unknown = aclCreations.stream() + .map(AclCreation::acl) + .anyMatch(AclBinding::isUnknown); + if (unknown) { + throw new IllegalArgumentException("You can not create ACL bindings with unknown elements"); + } } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java index d896bb2c88c..9bb15a358f2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java @@ -24,8 +24,8 @@ import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.resource.ResourceFilter; import org.apache.kafka.common.resource.ResourceNameType; +import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; @@ -107,7 +107,7 @@ public DeleteAclsRequest(Struct struct, short version) { this.filters = new ArrayList<>(); for (Object filterStructObj : struct.getArray(FILTERS)) { Struct filterStruct = (Struct) filterStructObj; - ResourceFilter resourceFilter = RequestUtils.resourceFilterFromStructFields(filterStruct); + ResourcePatternFilter resourceFilter = RequestUtils.resourcePatternFilterFromStructFields(filterStruct); AccessControlEntryFilter aceFilter = RequestUtils.aceFilterFromStructFields(filterStruct); this.filters.add(new AclBindingFilter(resourceFilter, aceFilter)); } @@ -123,7 +123,7 @@ protected Struct toStruct() { List<Struct> filterStructs = new ArrayList<>(); for (AclBindingFilter filter : filters) { Struct filterStruct = struct.instance(FILTERS); - RequestUtils.resourceFilterSetStructFields(filter.resourceFilter(), filterStruct); + RequestUtils.resourcePatternFilterSetStructFields(filter.patternFilter(), filterStruct); RequestUtils.aceFilterSetStructFields(filter.entryFilter(), filterStruct); filterStructs.add(filterStruct); } @@ -156,12 +156,17 @@ public static DeleteAclsRequest parse(ByteBuffer buffer, short version) { private void validate(short version, List<AclBindingFilter> filters) { if (version == 0) { final boolean unsupported = filters.stream() - .map(AclBindingFilter::resourceFilter) - .map(ResourceFilter::nameType) + .map(AclBindingFilter::patternFilter) + .map(ResourcePatternFilter::nameType) .anyMatch(nameType -> nameType != ResourceNameType.LITERAL); if (unsupported) { throw new UnsupportedVersionException("Version 0 only supports literal resource name types"); } } + + final boolean unknown = filters.stream().anyMatch(AclBindingFilter::isUnknown); + if (unknown) { + throw new IllegalArgumentException("Filters contain UNKNOWN elements"); + } } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java index 1790457f7e1..112c6a34e8f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -25,7 +26,6 @@ import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.resource.ResourceNameType; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -180,7 +180,7 @@ public DeleteAclsResponse(Struct struct) { Struct matchingAclStruct = (Struct) matchingAclStructObj; ApiError matchError = new ApiError(matchingAclStruct); AccessControlEntry entry = RequestUtils.aceFromStructFields(matchingAclStruct); - Resource resource = RequestUtils.resourceFromStructFields(matchingAclStruct); + ResourcePattern resource = RequestUtils.resourcePatternromStructFields(matchingAclStruct); deletions.add(new AclDeletionResult(matchError, new AclBinding(resource, entry))); } this.responses.add(new AclFilterResponse(error, deletions)); @@ -201,7 +201,7 @@ protected Struct toStruct(short version) { for (AclDeletionResult deletion : response.deletions()) { Struct deletionStruct = responseStruct.instance(MATCHING_ACLS_KEY_NAME); deletion.error.write(deletionStruct); - RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct); + RequestUtils.resourcePatternSetStructFields(deletion.acl().pattern(), deletionStruct); RequestUtils.aceSetStructFields(deletion.acl().entry(), deletionStruct); deletionStructs.add(deletionStruct); } @@ -247,12 +247,20 @@ private void validate(short version) { final boolean unsupported = responses.stream() .flatMap(r -> r.deletions.stream()) .map(AclDeletionResult::acl) - .map(AclBinding::resource) - .map(Resource::nameType) + .map(AclBinding::pattern) + .map(ResourcePattern::nameType) .anyMatch(nameType -> nameType != ResourceNameType.LITERAL); if (unsupported) { throw new UnsupportedVersionException("Version 0 only supports literal resource name types"); } } + + final boolean unknown = responses.stream() + .flatMap(r -> r.deletions.stream()) + .map(AclDeletionResult::acl) + .anyMatch(AclBinding::isUnknown); + if (unknown) { + throw new IllegalArgumentException("Response contains UNKNOWN elements"); + } } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java index d3a04d09a13..50963602b28 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java @@ -17,14 +17,13 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.acl.AccessControlEntryFilter; -import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.resource.ResourceFilter; import org.apache.kafka.common.resource.ResourceNameType; +import org.apache.kafka.common.resource.ResourcePatternFilter; import java.nio.ByteBuffer; import java.util.Collections; @@ -95,7 +94,7 @@ public String toString() { public DescribeAclsRequest(Struct struct, short version) { super(version); - ResourceFilter resourceFilter = RequestUtils.resourceFilterFromStructFields(struct); + ResourcePatternFilter resourceFilter = RequestUtils.resourcePatternFilterFromStructFields(struct); AccessControlEntryFilter entryFilter = RequestUtils.aceFilterFromStructFields(struct); this.filter = new AclBindingFilter(resourceFilter, entryFilter); } @@ -103,7 +102,7 @@ public DescribeAclsRequest(Struct struct, short version) { @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.DESCRIBE_ACLS.requestSchema(version())); - RequestUtils.resourceFilterSetStructFields(filter.resourceFilter(), struct); + RequestUtils.resourcePatternFilterSetStructFields(filter.patternFilter(), struct); RequestUtils.aceFilterSetStructFields(filter.entryFilter(), struct); return struct; } @@ -115,7 +114,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable case 0: case 1: return new DescribeAclsResponse(throttleTimeMs, ApiError.fromThrowable(throwable), - Collections.<AclBinding>emptySet()); + Collections.emptySet()); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_ACLS.latestVersion())); @@ -131,8 +130,12 @@ public AclBindingFilter filter() { } private void validate(AclBindingFilter filter, short version) { - if (version == 0 && filter.resourceFilter().nameType() != ResourceNameType.LITERAL) { + if (version == 0 && filter.patternFilter().nameType() != ResourceNameType.LITERAL) { throw new UnsupportedVersionException("Version 0 only supports literal resource name types"); } + + if (filter.isUnknown()) { + throw new IllegalArgumentException("Filter contain UNKNOWN elements"); + } } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java index b6673d9b487..66f2895d310 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -26,7 +27,6 @@ import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.resource.ResourceNameType; import java.nio.ByteBuffer; @@ -113,11 +113,11 @@ public DescribeAclsResponse(Struct struct) { this.acls = new ArrayList<>(); for (Object resourceStructObj : struct.getArray(RESOURCES_KEY_NAME)) { Struct resourceStruct = (Struct) resourceStructObj; - Resource resource = RequestUtils.resourceFromStructFields(resourceStruct); + ResourcePattern pattern = RequestUtils.resourcePatternromStructFields(resourceStruct); for (Object aclDataStructObj : resourceStruct.getArray(ACLS_KEY_NAME)) { Struct aclDataStruct = (Struct) aclDataStructObj; AccessControlEntry entry = RequestUtils.aceFromStructFields(aclDataStruct); - this.acls.add(new AclBinding(resource, entry)); + this.acls.add(new AclBinding(pattern, entry)); } } } @@ -130,21 +130,18 @@ protected Struct toStruct(short version) { struct.set(THROTTLE_TIME_MS, throttleTimeMs); error.write(struct); - Map<Resource, List<AccessControlEntry>> resourceToData = new HashMap<>(); + Map<ResourcePattern, List<AccessControlEntry>> resourceToData = new HashMap<>(); for (AclBinding acl : acls) { - List<AccessControlEntry> entry = resourceToData.get(acl.resource()); - if (entry == null) { - entry = new ArrayList<>(); - resourceToData.put(acl.resource(), entry); - } - entry.add(acl.entry()); + resourceToData + .computeIfAbsent(acl.pattern(), k -> new ArrayList<>()) + .add(acl.entry()); } List<Struct> resourceStructs = new ArrayList<>(); - for (Map.Entry<Resource, List<AccessControlEntry>> tuple : resourceToData.entrySet()) { - Resource resource = tuple.getKey(); + for (Map.Entry<ResourcePattern, List<AccessControlEntry>> tuple : resourceToData.entrySet()) { + ResourcePattern resource = tuple.getKey(); Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME); - RequestUtils.resourceSetStructFields(resource, resourceStruct); + RequestUtils.resourcePatternSetStructFields(resource, resourceStruct); List<Struct> dataStructs = new ArrayList<>(); for (AccessControlEntry entry : tuple.getValue()) { Struct dataStruct = resourceStruct.instance(ACLS_KEY_NAME); @@ -188,12 +185,17 @@ public boolean shouldClientThrottle(short version) { private void validate(short version) { if (version == 0) { final boolean unsupported = acls.stream() - .map(AclBinding::resource) - .map(Resource::nameType) + .map(AclBinding::pattern) + .map(ResourcePattern::nameType) .anyMatch(nameType -> nameType != ResourceNameType.LITERAL); if (unsupported) { throw new UnsupportedVersionException("Version 0 only supports literal resource name types"); } } + + final boolean unknown = acls.stream().anyMatch(AclBinding::isUnknown); + if (unknown) { + throw new IllegalArgumentException("Contain UNKNOWN elements"); + } } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java index f4f00a80641..76b27075253 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java @@ -20,11 +20,11 @@ import org.apache.kafka.common.acl.AccessControlEntryFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.resource.Resource; -import org.apache.kafka.common.resource.ResourceFilter; -import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.resource.ResourceNameType; +import org.apache.kafka.common.resource.ResourceType; import static org.apache.kafka.common.protocol.CommonFields.HOST; import static org.apache.kafka.common.protocol.CommonFields.HOST_FILTER; @@ -42,39 +42,39 @@ private RequestUtils() {} - static Resource resourceFromStructFields(Struct struct) { + static ResourcePattern resourcePatternromStructFields(Struct struct) { byte resourceType = struct.get(RESOURCE_TYPE); String name = struct.get(RESOURCE_NAME); ResourceNameType resourceNameType = ResourceNameType.LITERAL; if (struct.hasField(RESOURCE_NAME_TYPE)) { resourceNameType = ResourceNameType.fromCode(struct.get(RESOURCE_NAME_TYPE)); } - return new Resource(ResourceType.fromCode(resourceType), name, resourceNameType); + return new ResourcePattern(ResourceType.fromCode(resourceType), name, resourceNameType); } - static void resourceSetStructFields(Resource resource, Struct struct) { - struct.set(RESOURCE_TYPE, resource.resourceType().code()); - struct.set(RESOURCE_NAME, resource.name()); + static void resourcePatternSetStructFields(ResourcePattern pattern, Struct struct) { + struct.set(RESOURCE_TYPE, pattern.resourceType().code()); + struct.set(RESOURCE_NAME, pattern.name()); if (struct.hasField(RESOURCE_NAME_TYPE)) { - struct.set(RESOURCE_NAME_TYPE, resource.nameType().code()); + struct.set(RESOURCE_NAME_TYPE, pattern.nameType().code()); } } - static ResourceFilter resourceFilterFromStructFields(Struct struct) { + static ResourcePatternFilter resourcePatternFilterFromStructFields(Struct struct) { byte resourceType = struct.get(RESOURCE_TYPE); String name = struct.get(RESOURCE_NAME_FILTER); ResourceNameType resourceNameType = ResourceNameType.LITERAL; if (struct.hasField(RESOURCE_NAME_TYPE_FILTER)) { resourceNameType = ResourceNameType.fromCode(struct.get(RESOURCE_NAME_TYPE_FILTER)); } - return new ResourceFilter(ResourceType.fromCode(resourceType), name, resourceNameType); + return new ResourcePatternFilter(ResourceType.fromCode(resourceType), name, resourceNameType); } - static void resourceFilterSetStructFields(ResourceFilter resourceFilter, Struct struct) { - struct.set(RESOURCE_TYPE, resourceFilter.resourceType().code()); - struct.set(RESOURCE_NAME_FILTER, resourceFilter.name()); + static void resourcePatternFilterSetStructFields(ResourcePatternFilter patternFilter, Struct struct) { + struct.set(RESOURCE_TYPE, patternFilter.resourceType().code()); + struct.set(RESOURCE_NAME_FILTER, patternFilter.name()); if (struct.hasField(RESOURCE_NAME_TYPE_FILTER)) { - struct.set(RESOURCE_NAME_TYPE_FILTER, resourceFilter.nameType().code()); + struct.set(RESOURCE_NAME_TYPE_FILTER, patternFilter.nameType().code()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java index a4810b27ac4..f41f41a04b6 100644 --- a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java +++ b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java @@ -22,20 +22,14 @@ import java.util.Objects; /** - * Represents a cluster resource with a tuple of (type, name, nameType). + * Represents a cluster resource with a tuple of (type, name). * * The API for this class is still evolving and we may break compatibility in minor releases, if necessary. */ @InterfaceStability.Evolving public class Resource { - /** - * A special literal resource name that corresponds to 'all resources of a certain type'. - */ - public static final String WILDCARD_RESOURCE = "*"; - private final ResourceType resourceType; private final String name; - private final ResourceNameType nameType; /** * The name of the CLUSTER resource. @@ -45,32 +39,19 @@ /** * A resource representing the whole cluster. */ - public final static Resource CLUSTER = new Resource(ResourceType.CLUSTER, CLUSTER_NAME, ResourceNameType.LITERAL); - - /** - * Create an instance of this class with the provided parameters. - * - * @param resourceType non-null resource type - * @param name non-null resource name - * @param nameType non-null resource name type - */ - public Resource(ResourceType resourceType, String name, ResourceNameType nameType) { - this.resourceType = Objects.requireNonNull(resourceType, "resourceType"); - this.name = Objects.requireNonNull(name, "name"); - this.nameType = Objects.requireNonNull(nameType, "nameType"); - } + public final static Resource CLUSTER = new Resource(ResourceType.CLUSTER, CLUSTER_NAME); /** * Create an instance of this class with the provided parameters. - * Resource name type would default to ResourceNameType.LITERAL. * * @param resourceType non-null resource type * @param name non-null resource name - * @deprecated Since 2.0. Use {@link #Resource(ResourceType, String, ResourceNameType)} */ - @Deprecated public Resource(ResourceType resourceType, String name) { - this(resourceType, name, ResourceNameType.LITERAL); + Objects.requireNonNull(resourceType); + this.resourceType = resourceType; + Objects.requireNonNull(name); + this.name = name; } /** @@ -80,13 +61,6 @@ public ResourceType resourceType() { return resourceType; } - /** - * Return the resource name type. - */ - public ResourceNameType nameType() { - return nameType; - } - /** * Return the resource name. */ @@ -98,36 +72,31 @@ public String name() { * Create a filter which matches only this Resource. */ public ResourceFilter toFilter() { - return new ResourceFilter(resourceType, name, nameType); + return new ResourceFilter(resourceType, name); } @Override public String toString() { - return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ", nameType=" + nameType + ")"; + return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")"; } /** * Return true if this Resource has any UNKNOWN components. */ public boolean isUnknown() { - return resourceType.isUnknown() || nameType.isUnknown(); + return resourceType.isUnknown(); } @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) + if (!(o instanceof Resource)) return false; - - final Resource resource = (Resource) o; - return resourceType == resource.resourceType && - Objects.equals(name, resource.name) && - nameType == resource.nameType; + Resource other = (Resource) o; + return resourceType.equals(other.resourceType) && Objects.equals(name, other.name); } @Override public int hashCode() { - return Objects.hash(resourceType, name, nameType); + return Objects.hash(resourceType, name); } } diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java index e197e918c26..0a4611f9874 100644 --- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java +++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java @@ -21,8 +21,6 @@ import java.util.Objects; -import static org.apache.kafka.common.resource.Resource.WILDCARD_RESOURCE; - /** * A filter which matches Resource objects. * @@ -32,54 +30,22 @@ public class ResourceFilter { private final ResourceType resourceType; private final String name; - private final ResourceNameType nameType; /** * Matches any resource. */ - public static final ResourceFilter ANY = new ResourceFilter(ResourceType.ANY, null, ResourceNameType.ANY); + public static final ResourceFilter ANY = new ResourceFilter(ResourceType.ANY, null); /** - * Create a filter that matches {@link ResourceNameType#LITERAL literal} resources of the - * supplied {@code resourceType} and {@code name}. + * Create an instance of this class with the provided parameters. * * @param resourceType non-null resource type - * @param name resource name or {@code null}. - * If {@code null}, the filter will ignore the name of resources. - * @deprecated Since 2.0. Use {@link #ResourceFilter(ResourceType, String, ResourceNameType)} + * @param name resource name or null */ - @Deprecated public ResourceFilter(ResourceType resourceType, String name) { - this(resourceType, name, ResourceNameType.LITERAL); - } - - /** - * Create a filter that matches resources of the supplied {@code resourceType}, {@code name} and - * {@code nameType}. - * <p> - * If the filter has each three parameters fully supplied, then it will only match a resource that has exactly - * the same values, e.g. a filter of {@code new ResourceFilter(ResourceType.GROUP, "one", ResourceTypeName.PREFIXED)} - * will only match the resource {@code new Resource(ResourceType.GROUP, "one", ResourceTypeName.PREFIXED)}. - * <p> - * Any of the three parameters can be set to be ignored by the filter: - * <ul> - * <li><b>{@code resourceType}</b> can be set to {@link ResourceType#ANY}, - * meaning it will match a resource of any resource type</li> - * <li><b>{@code name}</b> can be set to {@code null}, meaning it will match a resource of any name.</li> - * <li><b>{@code nameType}</b> can be set to {@link ResourceNameType#ANY}, - * meaning it will match a resource with any resource name type, including the - * {@link Resource#WILDCARD_RESOURCE wildcard resource}</li> - * </ul> - * - * @param resourceType non-null resource type to filter by. - * @param name resource name to filter by, or {@code null}. - * If {@code null}, the filter will ignore the name of resources. - * @param nameType non-null resource name type to filter by. - */ - public ResourceFilter(ResourceType resourceType, String name, ResourceNameType nameType) { - this.resourceType = Objects.requireNonNull(resourceType, "resourceType"); + Objects.requireNonNull(resourceType); + this.resourceType = resourceType; this.name = name; - this.nameType = Objects.requireNonNull(nameType, "nameType"); } /** @@ -96,76 +62,40 @@ public String name() { return name; } - /** - * Return the resource name type. - */ - public ResourceNameType nameType() { - return nameType; - } - @Override public String toString() { - return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ", nameType=" + nameType + ")"; + return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")"; } /** * Return true if this ResourceFilter has any UNKNOWN components. */ public boolean isUnknown() { - return resourceType.isUnknown() || nameType.isUnknown(); + return resourceType.isUnknown(); } @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) + if (!(o instanceof ResourceFilter)) return false; - - final ResourceFilter that = (ResourceFilter) o; - return resourceType == that.resourceType && - Objects.equals(name, that.name) && - nameType == that.nameType; + ResourceFilter other = (ResourceFilter) o; + return resourceType.equals(other.resourceType) && Objects.equals(name, other.name); } @Override public int hashCode() { - return Objects.hash(resourceType, name, nameType); + return Objects.hash(resourceType, name); } /** * Return true if this filter matches the given Resource. - * @param other the resource path under which ACLs are stored. */ - public boolean matches(final Resource other) { - throwOnInvalidParams(other); - - if (!resourceType().equals(ResourceType.ANY) && !resourceType().equals(other.resourceType())) { + public boolean matches(Resource other) { + if ((name != null) && (!name.equals(other.name()))) return false; - } - - if (!nameType().equals(ResourceNameType.ANY) && !nameType().equals(other.nameType())) { + if ((resourceType != ResourceType.ANY) && (!resourceType.equals(other.resourceType()))) return false; - } - - if (name() == null) { - return true; - } - - if (nameType().equals(other.nameType())) { - return other.name().equals(name()); - } - - switch (other.nameType()) { - case LITERAL: - return other.name().equals(name()) || other.name().equals(WILDCARD_RESOURCE); - - case PREFIXED: - return name().startsWith(other.name()); - - default: - throw new IllegalArgumentException("Unsupported ResourceNameType: " + other.nameType()); - } + return true; } /** @@ -185,20 +115,6 @@ public String findIndefiniteField() { return "Resource type is UNKNOWN."; if (name == null) return "Resource name is NULL."; - if (nameType == ResourceNameType.ANY) - return "Resource name type is ANY."; - if (nameType == ResourceNameType.UNKNOWN) - return "Resource name type is UNKNOWN."; return null; } - - private static void throwOnInvalidParams(final Resource aclPath) { - if (aclPath.resourceType().equals(ResourceType.ANY) || aclPath.resourceType().equals(ResourceType.UNKNOWN)) { - throw new IllegalArgumentException("Only concrete resource types are supported. Got: " + aclPath.resourceType()); - } - - if (aclPath.nameType().equals(ResourceNameType.ANY) || aclPath.nameType().equals(ResourceNameType.UNKNOWN)) { - throw new IllegalArgumentException("Only concrete resource name types are supported. Got: " + aclPath.nameType()); - } - } } diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java new file mode 100644 index 00000000000..c6aee913a65 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.resource; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Objects; + +/** + * Represents a pattern that is used by ACLs to match zero or more + * {@link org.apache.kafka.common.resource.Resource Resources}. + * + * The API for this class is still evolving and we may break compatibility in minor releases, if necessary. + */ +@InterfaceStability.Evolving +public class ResourcePattern { + /** + * A special literal resource name that corresponds to 'all resources of a certain type'. + */ + public static final String WILDCARD_RESOURCE = "*"; + + private final ResourceType resourceType; + private final String name; + private final ResourceNameType nameType; + + /** + * Create a pattern using the supplied parameters. + * + * @param resourceType non-null, specific, resource type + * @param name non-null resource name, which can be the {@link #WILDCARD_RESOURCE}. + * @param nameType non-null, specific, resource name type, which controls how the pattern will match resource names. + */ + public ResourcePattern(ResourceType resourceType, String name, ResourceNameType nameType) { + this.resourceType = Objects.requireNonNull(resourceType, "resourceType"); + this.name = Objects.requireNonNull(name, "name"); + this.nameType = Objects.requireNonNull(nameType, "nameType"); + + if (resourceType == ResourceType.ANY) { + throw new IllegalArgumentException("resourceType must not be ANY"); + } + + if (nameType == ResourceNameType.ANY) { + throw new IllegalArgumentException("nameType must not be ANY"); + } + } + + /** + * @return the specific resource type this pattern matches + */ + public ResourceType resourceType() { + return resourceType; + } + + /** + * @return the resource name. + */ + public String name() { + return name; + } + + /** + * @return the resource name type. + */ + public ResourceNameType nameType() { + return nameType; + } + + /** + * @return a filter which matches only this pattern. + */ + public ResourcePatternFilter toFilter() { + return new ResourcePatternFilter(resourceType, name, nameType); + } + + @Override + public String toString() { + return "ResourcePattern(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ", nameType=" + nameType + ")"; + } + + /** + * @return {@code true} if this Resource has any UNKNOWN components. + */ + public boolean isUnknown() { + return resourceType.isUnknown() || nameType.isUnknown(); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + final ResourcePattern resource = (ResourcePattern) o; + return resourceType == resource.resourceType && + Objects.equals(name, resource.name) && + nameType == resource.nameType; + } + + @Override + public int hashCode() { + return Objects.hash(resourceType, name, nameType); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourcePatternFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourcePatternFilter.java new file mode 100644 index 00000000000..8b4fdc0b855 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourcePatternFilter.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.resource; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Objects; + +import static org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE; + +/** + * Represents a filter that can match {@link ResourcePattern}. + * <p> + * The API for this class is still evolving and we may break compatibility in minor releases, if necessary. + */ +@InterfaceStability.Evolving +public class ResourcePatternFilter { + /** + * Matches any resource pattern. + */ + public static final ResourcePatternFilter ANY = new ResourcePatternFilter(ResourceType.ANY, null, ResourceNameType.ANY); + + private final ResourceType resourceType; + private final String name; + private final ResourceNameType nameType; + + /** + * Create a filter using the supplied parameters. + * + * @param resourceType non-null resource type. + * If {@link ResourceType#ANY}, the filter will ignore the resource type of the pattern. + * If any other resource type, the filter will match only patterns with the same type. + * @param name resource name or {@code null}. + * If {@code null}, the filter will ignore the name of resources. + * If {@link ResourcePattern#WILDCARD_RESOURCE}, will match only wildcard patterns. + * @param nameType non-null resource name type. + * If {@link ResourceNameType#ANY}, the filter will match patterns that would match any + * {@code ResourceNameType}, and also any wildcards patterns. + * If any other resource name type, the filter will match only patterns with the same type. + */ + public ResourcePatternFilter(ResourceType resourceType, String name, ResourceNameType nameType) { + this.resourceType = Objects.requireNonNull(resourceType, "resourceType"); + this.name = name; + this.nameType = Objects.requireNonNull(nameType, "nameType"); + } + + /** + * @return {@code true} if this filter has any UNKNOWN components. + */ + public boolean isUnknown() { + return resourceType.isUnknown() || nameType.isUnknown(); + } + + /** + * @return the specific resource type this pattern matches + */ + public ResourceType resourceType() { + return resourceType; + } + + /** + * @return the resource name. + */ + public String name() { + return name; + } + + /** + * @return the resource name type. + */ + public ResourceNameType nameType() { + return nameType; + } + + /** + * @return {@code true} if this filter matches the given pattern. + */ + public boolean matches(ResourcePattern pattern) { + if (!resourceType.equals(ResourceType.ANY) && !resourceType.equals(pattern.resourceType())) { + return false; + } + + if (!nameType.equals(ResourceNameType.ANY) && !nameType.equals(pattern.nameType())) { + return false; + } + + if (name == null) { + return true; + } + + if (nameType.equals(pattern.nameType())) { + return name.equals(pattern.name()); + } + + switch (pattern.nameType()) { + case LITERAL: + return name.equals(pattern.name()) || pattern.name().equals(WILDCARD_RESOURCE); + + case PREFIXED: + return name.startsWith(pattern.name()); + + default: + throw new IllegalArgumentException("Unsupported ResourceNameType: " + pattern.nameType()); + } + } + + /** + * @return {@code true} if this filter could only match one pattern. + * In other words, if there are no ANY or UNKNOWN fields. + */ + public boolean matchesAtMostOne() { + return findIndefiniteField() == null; + } + + /** + * @return a string describing any ANY or UNKNOWN field, or null if there is no such field. + */ + public String findIndefiniteField() { + if (resourceType == ResourceType.ANY) + return "Resource type is ANY."; + if (resourceType == ResourceType.UNKNOWN) + return "Resource type is UNKNOWN."; + if (name == null) + return "Resource name is NULL."; + if (nameType == ResourceNameType.ANY) + return "Resource name type is ANY."; + if (nameType == ResourceNameType.UNKNOWN) + return "Resource name type is UNKNOWN."; + return null; + } + + @Override + public String toString() { + return "ResourcePattern(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ", nameType=" + nameType + ")"; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + final ResourcePatternFilter resource = (ResourcePatternFilter) o; + return resourceType == resource.resourceType && + Objects.equals(name, resource.name) && + nameType == resource.nameType; + } + + @Override + public int hashCode() { + return Objects.hash(resourceType, name, nameType); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 3a300dbfb1b..5cb6bbc9761 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -34,6 +34,8 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; @@ -65,8 +67,6 @@ import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.OffsetFetchResponse; -import org.apache.kafka.common.resource.Resource; -import org.apache.kafka.common.resource.ResourceFilter; import org.apache.kafka.common.resource.ResourceNameType; import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.utils.MockTime; @@ -537,13 +537,13 @@ private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTe } } - private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL), + private static final AclBinding ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)); - private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4", ResourceNameType.LITERAL), + private static final AclBinding ACL2 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic4", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY)); - private static final AclBindingFilter FILTER1 = new AclBindingFilter(new ResourceFilter(ResourceType.ANY, null, ResourceNameType.LITERAL), + private static final AclBindingFilter FILTER1 = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, ResourceNameType.LITERAL), new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY)); - private static final AclBindingFilter FILTER2 = new AclBindingFilter(new ResourceFilter(ResourceType.ANY, null, ResourceNameType.LITERAL), + private static final AclBindingFilter FILTER2 = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, ResourceNameType.LITERAL), new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY)); @Test diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java index 6110c48e2b0..a35faca80ec 100644 --- a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java +++ b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java @@ -16,9 +16,9 @@ */ package org.apache.kafka.common.acl; -import org.apache.kafka.common.resource.Resource; -import org.apache.kafka.common.resource.ResourceFilter; import org.apache.kafka.common.resource.ResourceNameType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.resource.ResourceType; import org.junit.Test; @@ -28,38 +28,38 @@ public class AclBindingTest { private static final AclBinding ACL1 = new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), + new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW)); private static final AclBinding ACL2 = new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), + new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), new AccessControlEntry("User:*", "", AclOperation.READ, AclPermissionType.ALLOW)); private static final AclBinding ACL3 = new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), + new ResourcePattern(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY)); private static final AclBinding UNKNOWN_ACL = new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), + new ResourcePattern(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.UNKNOWN, AclPermissionType.DENY)); private static final AclBindingFilter ANY_ANONYMOUS = new AclBindingFilter( - ResourceFilter.ANY, + ResourcePatternFilter.ANY, new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY)); private static final AclBindingFilter ANY_DENY = new AclBindingFilter( - ResourceFilter.ANY, + ResourcePatternFilter.ANY, new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.DENY)); private static final AclBindingFilter ANY_MYTOPIC = new AclBindingFilter( - new ResourceFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), + new ResourcePatternFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY)); @Test public void testMatching() throws Exception { assertTrue(ACL1.equals(ACL1)); final AclBinding acl1Copy = new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), + new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW)); assertTrue(ACL1.equals(acl1Copy)); assertTrue(acl1Copy.equals(ACL1)); diff --git a/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternFilterTest.java b/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternFilterTest.java new file mode 100644 index 00000000000..87b25fcbe08 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternFilterTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import org.apache.kafka.common.resource.ResourceNameType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.junit.Test; + +import static org.apache.kafka.common.resource.ResourceNameType.LITERAL; +import static org.apache.kafka.common.resource.ResourceNameType.PREFIXED; +import static org.apache.kafka.common.resource.ResourceType.ANY; +import static org.apache.kafka.common.resource.ResourceType.GROUP; +import static org.apache.kafka.common.resource.ResourceType.TOPIC; +import static org.apache.kafka.common.resource.ResourceType.UNKNOWN; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ResourcePatternFilterTest { + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfResourceTypeIsAny() { + new ResourcePatternFilter(ANY, null, ResourceNameType.ANY) + .matches(new ResourcePattern(ANY, "Name", PREFIXED)); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfResourceNameTypeIsAny() { + new ResourcePatternFilter(ANY, null, ResourceNameType.ANY) + .matches(new ResourcePattern(GROUP, "Name", ResourceNameType.ANY)); + } + + @Test + public void shouldBeUnknownIfResourceTypeUnknown() { + assertTrue(new ResourcePatternFilter(UNKNOWN, null, ResourceNameType.LITERAL).isUnknown()); + } + + @Test + public void shouldBeUnknownIfResourceNameTypeUnknown() { + assertTrue(new ResourcePatternFilter(GROUP, null, ResourceNameType.UNKNOWN).isUnknown()); + } + + @Test + public void shouldNotMatchIfDifferentResourceType() { + assertFalse(new ResourcePatternFilter(TOPIC, "Name", LITERAL) + .matches(new ResourcePattern(GROUP, "Name", LITERAL))); + } + + @Test + public void shouldNotMatchIfDifferentName() { + assertFalse(new ResourcePatternFilter(TOPIC, "Different", PREFIXED) + .matches(new ResourcePattern(TOPIC, "Name", PREFIXED))); + } + + @Test + public void shouldNotMatchIfDifferentNameCase() { + assertFalse(new ResourcePatternFilter(TOPIC, "NAME", LITERAL) + .matches(new ResourcePattern(TOPIC, "Name", LITERAL))); + } + + @Test + public void shouldNotMatchIfDifferentNameType() { + assertFalse(new ResourcePatternFilter(TOPIC, "Name", LITERAL) + .matches(new ResourcePattern(TOPIC, "Name", PREFIXED))); + } + + @Test + public void shouldMatchWhereResourceTypeIsAny() { + assertTrue(new ResourcePatternFilter(ANY, "Name", PREFIXED) + .matches(new ResourcePattern(TOPIC, "Name", PREFIXED))); + } + + @Test + public void shouldMatchWhereResourceNameIsAny() { + assertTrue(new ResourcePatternFilter(TOPIC, null, PREFIXED) + .matches(new ResourcePattern(TOPIC, "Name", PREFIXED))); + } + + @Test + public void shouldMatchWhereResourceNameTypeIsAny() { + assertTrue(new ResourcePatternFilter(TOPIC, null, ResourceNameType.ANY) + .matches(new ResourcePattern(TOPIC, "Name", PREFIXED))); + } + + @Test + public void shouldMatchLiteralIfExactMatch() { + assertTrue(new ResourcePatternFilter(TOPIC, "Name", LITERAL) + .matches(new ResourcePattern(TOPIC, "Name", LITERAL))); + } + + @Test + public void shouldMatchLiteralIfNameMatchesAndFilterIsOnAnyNameType() { + assertTrue(new ResourcePatternFilter(TOPIC, "Name", ResourceNameType.ANY) + .matches(new ResourcePattern(TOPIC, "Name", LITERAL))); + } + + @Test + public void shouldNotMatchLiteralIfNamePrefixed() { + assertFalse(new ResourcePatternFilter(TOPIC, "Name-something", ResourceNameType.ANY) + .matches(new ResourcePattern(TOPIC, "Name", LITERAL))); + } + + @Test + public void shouldMatchLiteralWildcardIfExactMatch() { + assertTrue(new ResourcePatternFilter(TOPIC, "*", LITERAL) + .matches(new ResourcePattern(TOPIC, "*", LITERAL))); + } + + @Test + public void shouldNotMatchLiteralWildcardAgainstOtherName() { + assertFalse(new ResourcePatternFilter(TOPIC, "Name", LITERAL) + .matches(new ResourcePattern(TOPIC, "*", LITERAL))); + } + + @Test + public void shouldNotMatchLiteralWildcardTheWayAround() { + assertFalse(new ResourcePatternFilter(TOPIC, "*", LITERAL) + .matches(new ResourcePattern(TOPIC, "Name", LITERAL))); + } + + @Test + public void shouldMatchLiteralWildcardIfFilterHasNameTypeOfAny() { + assertTrue(new ResourcePatternFilter(TOPIC, "Name", ResourceNameType.ANY) + .matches(new ResourcePattern(TOPIC, "*", LITERAL))); + } + + @Test + public void shouldMatchPrefixedIfExactMatch() { + assertTrue(new ResourcePatternFilter(TOPIC, "Name", PREFIXED) + .matches(new ResourcePattern(TOPIC, "Name", PREFIXED))); + } + + @Test + public void shouldNotMatchIfBothPrefixedAndFilterIsPrefixOfResource() { + assertFalse(new ResourcePatternFilter(TOPIC, "Name", PREFIXED) + .matches(new ResourcePattern(TOPIC, "Name-something", PREFIXED))); + } + + @Test + public void shouldNotMatchIfBothPrefixedAndResourceIsPrefixOfFilter() { + assertFalse(new ResourcePatternFilter(TOPIC, "Name-something", PREFIXED) + .matches(new ResourcePattern(TOPIC, "Name", PREFIXED))); + } + + @Test + public void shouldMatchPrefixedIfNamePrefixedAnyFilterTypeIsAny() { + assertTrue(new ResourcePatternFilter(TOPIC, "Name-something", ResourceNameType.ANY) + .matches(new ResourcePattern(TOPIC, "Name", PREFIXED))); + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternTest.java b/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternTest.java new file mode 100644 index 00000000000..d76e213118f --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import org.apache.kafka.common.resource.ResourceNameType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.junit.Test; + +public class ResourcePatternTest { + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfResourceTypeIsAny() { + new ResourcePattern(ResourceType.ANY, "name", ResourceNameType.LITERAL); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfResourceNameTypeIsAny() { + new ResourcePattern(ResourceType.TOPIC, "name", ResourceNameType.ANY); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowIfResourceNameIsNull() { + new ResourcePattern(ResourceType.TOPIC, null, ResourceNameType.ANY); + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java index 748914b4931..1f3c15c35cc 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java @@ -24,8 +24,8 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation; -import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.resource.ResourceNameType; +import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; import org.junit.Test; @@ -39,13 +39,16 @@ private static final short V0 = 0; private static final short V1 = 1; - private static final AclBinding LITERAL_ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL), + private static final AclBinding LITERAL_ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY)); - private static final AclBinding LITERAL_ACL2 = new AclBinding(new Resource(ResourceType.GROUP, "group", ResourceNameType.LITERAL), + private static final AclBinding LITERAL_ACL2 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "group", ResourceNameType.LITERAL), new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)); - private static final AclBinding PREFIXED_ACL1 = new AclBinding(new Resource(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), + private static final AclBinding PREFIXED_ACL1 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), + new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW)); + + private static final AclBinding UNKNOWN_ACL1 = new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "unknown", ResourceNameType.LITERAL), new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW)); @Test(expected = UnsupportedVersionException.class) @@ -53,6 +56,11 @@ public void shouldThrowOnV0IfNotLiteral() { new CreateAclsRequest(V0, aclCreations(PREFIXED_ACL1)); } + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnIfUnknown() { + new CreateAclsRequest(V0, aclCreations(UNKNOWN_ACL1)); + } + @Test public void shouldRoundTripV0() { final CreateAclsRequest original = new CreateAclsRequest(V0, aclCreations(LITERAL_ACL1, LITERAL_ACL2)); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java index 77613378671..42b7dcc808f 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java @@ -21,9 +21,9 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.resource.ResourceFilter; import org.apache.kafka.common.resource.ResourceNameType; import org.apache.kafka.common.resource.ResourceType; import org.junit.Test; @@ -37,13 +37,16 @@ private static final short V0 = 0; private static final short V1 = 1; - private static final AclBindingFilter LITERAL_FILTER = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL), + private static final AclBindingFilter LITERAL_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL), new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY)); - private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), + private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW)); - private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), + private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), + new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW)); + + private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "prefix", ResourceNameType.PREFIXED), new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW)); @Test(expected = UnsupportedVersionException.class) @@ -51,6 +54,11 @@ public void shouldThrowOnV0IfNotLiteral() { new DeleteAclsRequest(V0, aclFilters(PREFIXED_FILTER)); } + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnUnknownElements() { + new DeleteAclsRequest(V1, aclFilters(UNKNOWN_FILTER)); + } + @Test public void shouldRoundTripV0() { final DeleteAclsRequest original = new DeleteAclsRequest(V0, aclFilters(LITERAL_FILTER)); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java index f8e9148cbd1..b6d9f92bf71 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java @@ -21,11 +21,11 @@ import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; -import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.resource.ResourceNameType; import org.apache.kafka.common.resource.ResourceType; import org.junit.Test; @@ -41,24 +41,34 @@ private static final short V0 = 0; private static final short V1 = 1; - private static final AclBinding LITERAL_ACL1 = new AclBinding(new org.apache.kafka.common.resource.Resource(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL), + private static final AclBinding LITERAL_ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY)); - private static final AclBinding LITERAL_ACL2 = new AclBinding(new org.apache.kafka.common.resource.Resource(ResourceType.GROUP, "group", ResourceNameType.LITERAL), + private static final AclBinding LITERAL_ACL2 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "group", ResourceNameType.LITERAL), new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)); - private static final AclBinding PREFIXED_ACL1 = new AclBinding(new Resource(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), + private static final AclBinding PREFIXED_ACL1 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW)); + private static final AclBinding UNKNOWN_ACL = new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "group", ResourceNameType.LITERAL), + new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)); + private static final AclFilterResponse LITERAL_RESPONSE = new AclFilterResponse(aclDeletions(LITERAL_ACL1, LITERAL_ACL2)); private static final AclFilterResponse PREFIXED_RESPONSE = new AclFilterResponse(aclDeletions(LITERAL_ACL1, PREFIXED_ACL1)); + private static final AclFilterResponse UNKNOWN_RESPONSE = new AclFilterResponse(aclDeletions(UNKNOWN_ACL)); + @Test(expected = UnsupportedVersionException.class) public void shouldThrowOnV0IfNotLiteral() { new DeleteAclsResponse(10, aclResponses(PREFIXED_RESPONSE)).toStruct(V0); } + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnIfUnknown() { + new DeleteAclsResponse(10, aclResponses(UNKNOWN_RESPONSE)).toStruct(V1); + } + @Test public void shouldRoundTripV0() { final DeleteAclsResponse original = new DeleteAclsResponse(10, aclResponses(LITERAL_RESPONSE)); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java index 543cf37576c..3e4e531e29d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java @@ -21,9 +21,9 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.resource.ResourceFilter; import org.apache.kafka.common.resource.ResourceNameType; import org.apache.kafka.common.resource.ResourceType; import org.junit.Test; @@ -34,20 +34,28 @@ private static final short V0 = 0; private static final short V1 = 1; - private static final AclBindingFilter LITERAL_FILTER = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL), + private static final AclBindingFilter LITERAL_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL), new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY)); - private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), + private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW)); - private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), + private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW)); + private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "foo", ResourceNameType.LITERAL), + new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY)); + @Test(expected = UnsupportedVersionException.class) public void shouldThrowOnV0IfNotLiteral() { new DescribeAclsRequest(PREFIXED_FILTER, V0); } + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfUnknown() { + new DescribeAclsRequest(UNKNOWN_FILTER, V0); + } + @Test public void shouldRoundTripV0() { final DescribeAclsRequest original = new DescribeAclsRequest(LITERAL_FILTER, V0); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java index 81cf5185367..2d3ac8f8476 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java @@ -23,8 +23,8 @@ import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.resource.ResourceNameType; +import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; import org.junit.Test; @@ -39,20 +39,28 @@ private static final short V0 = 0; private static final short V1 = 1; - private static final AclBinding LITERAL_ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL), + private static final AclBinding LITERAL_ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY)); - private static final AclBinding LITERAL_ACL2 = new AclBinding(new Resource(ResourceType.GROUP, "group", ResourceNameType.LITERAL), + private static final AclBinding LITERAL_ACL2 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "group", ResourceNameType.LITERAL), new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)); - private static final AclBinding PREFIXED_ACL1 = new AclBinding(new Resource(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), + private static final AclBinding PREFIXED_ACL1 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED), new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW)); + private static final AclBinding UNKNOWN_ACL = new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "foo", ResourceNameType.LITERAL), + new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY)); + @Test(expected = UnsupportedVersionException.class) public void shouldThrowOnV0IfNotLiteral() { new DescribeAclsResponse(10, ApiError.NONE, aclBindings(PREFIXED_ACL1)).toStruct(V0); } + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfUnknown() { + new DescribeAclsResponse(10, ApiError.NONE, aclBindings(UNKNOWN_ACL)).toStruct(V0); + } + @Test public void shouldRoundTripV0() { final DescribeAclsResponse original = new DescribeAclsResponse(10, ApiError.NONE, aclBindings(LITERAL_ACL1, LITERAL_ACL2)); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index e0c72a26f54..f537f48f8f4 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -25,6 +25,8 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.NotCoordinatorException; @@ -45,8 +47,6 @@ import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; -import org.apache.kafka.common.resource.Resource; -import org.apache.kafka.common.resource.ResourceFilter; import org.apache.kafka.common.resource.ResourceNameType; import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; @@ -1087,23 +1087,23 @@ private TxnOffsetCommitResponse createTxnOffsetCommitResponse() { private DescribeAclsRequest createListAclsRequest() { return new DescribeAclsRequest.Builder(new AclBindingFilter( - new ResourceFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), + new ResourcePatternFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).build(); } private DescribeAclsResponse createDescribeAclsResponse() { return new DescribeAclsResponse(0, ApiError.NONE, Collections.singleton(new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), + new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW)))); } private CreateAclsRequest createCreateAclsRequest() { List<AclCreation> creations = new ArrayList<>(); creations.add(new AclCreation(new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), + new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.ALLOW)))); creations.add(new AclCreation(new AclBinding( - new Resource(ResourceType.GROUP, "mygroup", ResourceNameType.LITERAL), + new ResourcePattern(ResourceType.GROUP, "mygroup", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY)))); return new CreateAclsRequest.Builder(creations).build(); } @@ -1116,10 +1116,10 @@ private CreateAclsResponse createCreateAclsResponse() { private DeleteAclsRequest createDeleteAclsRequest() { List<AclBindingFilter> filters = new ArrayList<>(); filters.add(new AclBindingFilter( - new ResourceFilter(ResourceType.ANY, null, ResourceNameType.LITERAL), + new ResourcePatternFilter(ResourceType.ANY, null, ResourceNameType.LITERAL), new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY))); filters.add(new AclBindingFilter( - new ResourceFilter(ResourceType.ANY, null, ResourceNameType.LITERAL), + new ResourcePatternFilter(ResourceType.ANY, null, ResourceNameType.LITERAL), new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY))); return new DeleteAclsRequest.Builder(filters).build(); } @@ -1128,10 +1128,10 @@ private DeleteAclsResponse createDeleteAclsResponse() { List<AclFilterResponse> responses = new ArrayList<>(); responses.add(new AclFilterResponse(Utils.mkSet( new AclDeletionResult(new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL), + new ResourcePattern(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))), new AclDeletionResult(new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic4", ResourceNameType.LITERAL), + new ResourcePattern(ResourceType.TOPIC, "mytopic4", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY)))))); responses.add(new AclFilterResponse(new ApiError(Errors.SECURITY_DISABLED, "No security"), Collections.<AclDeletionResult>emptySet())); diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceFilterTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceFilterTest.java index 9b2d6d4e696..4399744d91d 100644 --- a/clients/src/test/java/org/apache/kafka/common/resource/ResourceFilterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceFilterTest.java @@ -19,145 +19,64 @@ import org.junit.Test; -import static org.apache.kafka.common.resource.ResourceNameType.LITERAL; -import static org.apache.kafka.common.resource.ResourceNameType.PREFIXED; import static org.apache.kafka.common.resource.ResourceType.ANY; import static org.apache.kafka.common.resource.ResourceType.GROUP; import static org.apache.kafka.common.resource.ResourceType.TOPIC; -import static org.apache.kafka.common.resource.ResourceType.UNKNOWN; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class ResourceFilterTest { - @Test(expected = IllegalArgumentException.class) - public void shouldThrowIfResourceTypeIsAny() { - new ResourceFilter(ANY, null, ResourceNameType.ANY) - .matches(new Resource(ANY, "Name", PREFIXED)); - } - - @Test(expected = IllegalArgumentException.class) - public void shouldThrowIfResourceTypeIsUnknown() { - new ResourceFilter(ANY, null, ResourceNameType.ANY) - .matches(new Resource(UNKNOWN, "Name", LITERAL)); - } - - @Test(expected = IllegalArgumentException.class) - public void shouldThrowIfResourceNameTypeIsAny() { - new ResourceFilter(ANY, null, ResourceNameType.ANY) - .matches(new Resource(GROUP, "Name", ResourceNameType.ANY)); - } - - @Test(expected = IllegalArgumentException.class) - public void shouldThrowIfAclResourceNameTypeIsUnknown() { - new ResourceFilter(ANY, null, ResourceNameType.ANY) - .matches(new Resource(GROUP, "Name", ResourceNameType.UNKNOWN)); - } - @Test public void shouldNotMatchIfDifferentResourceType() { - assertFalse(new ResourceFilter(TOPIC, "Name", LITERAL) - .matches(new Resource(GROUP, "Name", LITERAL))); + assertFalse(new ResourceFilter(TOPIC, "Name") + .matches(new Resource(GROUP, "Name"))); } @Test public void shouldNotMatchIfDifferentName() { - assertFalse(new ResourceFilter(TOPIC, "Different", PREFIXED) - .matches(new Resource(TOPIC, "Name", PREFIXED))); + assertFalse(new ResourceFilter(TOPIC, "Different") + .matches(new Resource(TOPIC, "Name"))); } @Test public void shouldNotMatchIfDifferentNameCase() { - assertFalse(new ResourceFilter(TOPIC, "NAME", LITERAL) - .matches(new Resource(TOPIC, "Name", LITERAL))); - } - - @Test - public void shouldNotMatchIfDifferentNameType() { - assertFalse(new ResourceFilter(TOPIC, "Name", LITERAL) - .matches(new Resource(TOPIC, "Name", PREFIXED))); + assertFalse(new ResourceFilter(TOPIC, "NAME") + .matches(new Resource(TOPIC, "Name"))); } @Test public void shouldMatchWhereResourceTypeIsAny() { - assertTrue(new ResourceFilter(ANY, "Name", PREFIXED) - .matches(new Resource(TOPIC, "Name", PREFIXED))); + assertTrue(new ResourceFilter(ANY, "Name") + .matches(new Resource(TOPIC, "Name"))); } @Test public void shouldMatchWhereResourceNameIsAny() { - assertTrue(new ResourceFilter(TOPIC, null, PREFIXED) - .matches(new Resource(TOPIC, "Name", PREFIXED))); - } - - @Test - public void shouldMatchWhereResourceNameTypeIsAny() { - assertTrue(new ResourceFilter(TOPIC, null, ResourceNameType.ANY) - .matches(new Resource(TOPIC, "Name", PREFIXED))); + assertTrue(new ResourceFilter(TOPIC, null) + .matches(new Resource(TOPIC, "Name"))); } @Test - public void shouldMatchLiteralIfExactMatch() { - assertTrue(new ResourceFilter(TOPIC, "Name", LITERAL) - .matches(new Resource(TOPIC, "Name", LITERAL))); + public void shouldMatchIfExactMatch() { + assertTrue(new ResourceFilter(TOPIC, "Name") + .matches(new Resource(TOPIC, "Name"))); } @Test - public void shouldMatchLiteralIfNameMatchesAndFilterIsOnAnyNameType() { - assertTrue(new ResourceFilter(TOPIC, "Name", ResourceNameType.ANY) - .matches(new Resource(TOPIC, "Name", LITERAL))); + public void shouldMatchWildcardIfExactMatch() { + assertTrue(new ResourceFilter(TOPIC, "*") + .matches(new Resource(TOPIC, "*"))); } @Test - public void shouldNotMatchLiteralIfNamePrefixed() { - assertFalse(new ResourceFilter(TOPIC, "Name-something", ResourceNameType.ANY) - .matches(new Resource(TOPIC, "Name", LITERAL))); - } - - @Test - public void shouldMatchLiteralWildcardIfExactMatch() { - assertTrue(new ResourceFilter(TOPIC, "*", LITERAL) - .matches(new Resource(TOPIC, "*", LITERAL))); - } - - @Test - public void shouldNotMatchLiteralWildcardAgainstOtherName() { - assertFalse(new ResourceFilter(TOPIC, "Name", LITERAL) - .matches(new Resource(TOPIC, "*", LITERAL))); + public void shouldNotMatchWildcardAgainstOtherName() { + assertFalse(new ResourceFilter(TOPIC, "Name") + .matches(new Resource(TOPIC, "*"))); } @Test public void shouldNotMatchLiteralWildcardTheWayAround() { - assertFalse(new ResourceFilter(TOPIC, "*", LITERAL) - .matches(new Resource(TOPIC, "Name", LITERAL))); - } - - @Test - public void shouldMatchLiteralWildcardIfFilterHasNameTypeOfAny() { - assertTrue(new ResourceFilter(TOPIC, "Name", ResourceNameType.ANY) - .matches(new Resource(TOPIC, "*", LITERAL))); - } - - @Test - public void shouldMatchPrefixedIfExactMatch() { - assertTrue(new ResourceFilter(TOPIC, "Name", PREFIXED) - .matches(new Resource(TOPIC, "Name", PREFIXED))); - } - - @Test - public void shouldNotMatchIfBothPrefixedAndFilterIsPrefixOfResource() { - assertFalse(new ResourceFilter(TOPIC, "Name", PREFIXED) - .matches(new Resource(TOPIC, "Name-something", PREFIXED))); - } - - @Test - public void shouldNotMatchIfBothPrefixedAndResourceIsPrefixOfFilter() { - assertFalse(new ResourceFilter(TOPIC, "Name-something", PREFIXED) - .matches(new Resource(TOPIC, "Name", PREFIXED))); - } - - @Test - public void shouldMatchPrefixedIfNamePrefixedAnyFilterTypeIsAny() { - assertTrue(new ResourceFilter(TOPIC, "Name-something", ResourceNameType.ANY) - .matches(new Resource(TOPIC, "Name", PREFIXED))); + assertFalse(new ResourceFilter(TOPIC, "*") + .matches(new Resource(TOPIC, "Name"))); } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 25b630d4159..d55e8861df0 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -25,13 +25,13 @@ import kafka.utils._ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.resource.{ResourceFilter, ResourceNameType => JResourceNameType, ResourceType => JResourceType, Resource => JResource} +import org.apache.kafka.common.resource.{ResourcePatternFilter, ResourceNameType => JResourceNameType, ResourceType => JResourceType, Resource => JResource} import scala.collection.JavaConverters._ object AclCommand extends Logging { - val ClusterResourceFilter = new ResourceFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, JResourceNameType.LITERAL) + val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, JResourceNameType.LITERAL) private val Newline = scala.util.Properties.lineSeparator @@ -126,7 +126,7 @@ object AclCommand extends Logging { } } - private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: ResourceFilter) { + private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: ResourcePatternFilter) { getAcls(authorizer, filter) .keys .foreach(resource => @@ -148,12 +148,12 @@ object AclCommand extends Logging { } } - private def getAcls(authorizer: Authorizer, filter: ResourceFilter): Map[Resource, Set[Acl]] = + private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): Map[Resource, Set[Acl]] = authorizer.getAcls() - .filter { case (resource, acl) => filter.matches(resource.toJava) } + .filter { case (resource, acl) => filter.matches(resource.toPattern) } - private def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourceFilter, Set[Acl]] = { - var resourceToAcls = Map.empty[ResourceFilter, Set[Acl]] + private def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[Acl]] = { + var resourceToAcls = Map.empty[ResourcePatternFilter, Set[Acl]] //if none of the --producer or --consumer options are specified , just construct ACLs from CLI options. if (!opts.options.has(opts.producerOpt) && !opts.options.has(opts.consumerOpt)) { @@ -172,11 +172,11 @@ object AclCommand extends Logging { resourceToAcls } - private def getProducerResourceFilterToAcls(opts: AclCommandOptions): Map[ResourceFilter, Set[Acl]] = { + private def getProducerResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[Acl]] = { val filters = getResourceFilter(opts) - val topics: Set[ResourceFilter] = filters.filter(_.resourceType == JResourceType.TOPIC) - val transactionalIds: Set[ResourceFilter] = filters.filter(_.resourceType == JResourceType.TRANSACTIONAL_ID) + val topics: Set[ResourcePatternFilter] = filters.filter(_.resourceType == JResourceType.TOPIC) + val transactionalIds: Set[ResourcePatternFilter] = filters.filter(_.resourceType == JResourceType.TRANSACTIONAL_ID) val enableIdempotence = opts.options.has(opts.idempotentOpt) val topicAcls = getAcl(opts, Set(Write, Describe, Create)) @@ -185,27 +185,27 @@ object AclCommand extends Logging { //Write, Describe, Create permission on topics, Write, Describe on transactionalIds topics.map(_ -> topicAcls).toMap ++ transactionalIds.map(_ -> transactionalIdAcls).toMap ++ - (if (enableIdempotence) + (if (enableIdempotence) Map(ClusterResourceFilter -> getAcl(opts, Set(IdempotentWrite))) else Map.empty) } - private def getConsumerResourceFilterToAcls(opts: AclCommandOptions): Map[ResourceFilter, Set[Acl]] = { + private def getConsumerResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[Acl]] = { val filters = getResourceFilter(opts) - val topics: Set[ResourceFilter] = filters.filter(_.resourceType == JResourceType.TOPIC) - val groups: Set[ResourceFilter] = filters.filter(_.resourceType == JResourceType.GROUP) + val topics: Set[ResourcePatternFilter] = filters.filter(_.resourceType == JResourceType.TOPIC) + val groups: Set[ResourcePatternFilter] = filters.filter(_.resourceType == JResourceType.GROUP) //Read, Describe on topic, Read on consumerGroup val acls = getAcl(opts, Set(Read, Describe)) - topics.map(_ -> acls).toMap[ResourceFilter, Set[Acl]] ++ - groups.map(_ -> getAcl(opts, Set(Read))).toMap[ResourceFilter, Set[Acl]] + topics.map(_ -> acls).toMap[ResourcePatternFilter, Set[Acl]] ++ + groups.map(_ -> getAcl(opts, Set(Read))).toMap[ResourcePatternFilter, Set[Acl]] } - private def getCliResourceFilterToAcls(opts: AclCommandOptions): Map[ResourceFilter, Set[Acl]] = { + private def getCliResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[Acl]] = { val acls = getAcl(opts) val filters = getResourceFilter(opts) filters.map(_ -> acls).toMap @@ -261,25 +261,25 @@ object AclCommand extends Logging { Set.empty[KafkaPrincipal] } - private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourceFilter] = { + private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourcePatternFilter] = { val resourceNameType: JResourceNameType = opts.options.valueOf(opts.resourceNameType) - var resourceFilters = Set.empty[ResourceFilter] + var resourceFilters = Set.empty[ResourcePatternFilter] if (opts.options.has(opts.topicOpt)) - opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resourceFilters += new ResourceFilter(JResourceType.TOPIC, topic.trim, resourceNameType)) + opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resourceFilters += new ResourcePatternFilter(JResourceType.TOPIC, topic.trim, resourceNameType)) if (resourceNameType == JResourceNameType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt))) resourceFilters += ClusterResourceFilter if (opts.options.has(opts.groupOpt)) - opts.options.valuesOf(opts.groupOpt).asScala.foreach(group => resourceFilters += new ResourceFilter(JResourceType.GROUP, group.trim, resourceNameType)) + opts.options.valuesOf(opts.groupOpt).asScala.foreach(group => resourceFilters += new ResourcePatternFilter(JResourceType.GROUP, group.trim, resourceNameType)) if (opts.options.has(opts.transactionalIdOpt)) opts.options.valuesOf(opts.transactionalIdOpt).asScala.foreach(transactionalId => - resourceFilters += new ResourceFilter(JResourceType.TRANSACTIONAL_ID, transactionalId, resourceNameType)) + resourceFilters += new ResourcePatternFilter(JResourceType.TRANSACTIONAL_ID, transactionalId, resourceNameType)) if (opts.options.has(opts.delegationTokenOpt)) - opts.options.valuesOf(opts.delegationTokenOpt).asScala.foreach(token => resourceFilters += new ResourceFilter(JResourceType.DELEGATION_TOKEN, token.trim, resourceNameType)) + opts.options.valuesOf(opts.delegationTokenOpt).asScala.foreach(token => resourceFilters += new ResourcePatternFilter(JResourceType.DELEGATION_TOKEN, token.trim, resourceNameType)) if (resourceFilters.isEmpty && dieIfNoResourceFound) CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group> or --delegation-token <Delegation Token ID>") @@ -294,7 +294,7 @@ object AclCommand extends Logging { Console.readLine().equalsIgnoreCase("y") } - private def validateOperation(opts: AclCommandOptions, resourceToAcls: Map[ResourceFilter, Set[Acl]]): Unit = { + private def validateOperation(opts: AclCommandOptions, resourceToAcls: Map[ResourcePatternFilter, Set[Acl]]): Unit = { for ((resource, acls) <- resourceToAcls) { val validOps = ResourceTypeToValidOperations(resource.resourceType) if ((acls.map(_.operation) -- validOps).nonEmpty) diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala index 8442ba0a36f..7489a3e9b64 100644 --- a/core/src/main/scala/kafka/security/SecurityUtils.scala +++ b/core/src/main/scala/kafka/security/SecurityUtils.scala @@ -21,7 +21,7 @@ import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceNa import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.ApiError -import org.apache.kafka.common.resource.{Resource => AdminResource} +import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.security.auth.KafkaPrincipal import scala.util.{Failure, Success, Try} @@ -31,12 +31,12 @@ object SecurityUtils { def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = { (for { - resourceType <- Try(ResourceType.fromJava(filter.resourceFilter.resourceType)) - resourceNameType <- Try(ResourceNameType.fromJava(filter.resourceFilter.nameType)) + resourceType <- Try(ResourceType.fromJava(filter.patternFilter.resourceType)) + resourceNameType <- Try(ResourceNameType.fromJava(filter.patternFilter.nameType)) principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal)) operation <- Try(Operation.fromJava(filter.entryFilter.operation)) permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType)) - resource = Resource(resourceType, filter.resourceFilter.name, resourceNameType) + resource = Resource(resourceType, filter.patternFilter.name, resourceNameType) acl = Acl(principal, permissionType, filter.entryFilter.host, operation) } yield (resource, acl)) match { case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage)) @@ -45,10 +45,10 @@ object SecurityUtils { } def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = { - val adminResource = new AdminResource(resource.resourceType.toJava, resource.name, resource.resourceNameType.toJava) + val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType.toJava) val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava) - new AclBinding(adminResource, entry) + new AclBinding(resourcePattern, entry) } } diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala index 7fa1638cf0d..bdd85840360 100644 --- a/core/src/main/scala/kafka/security/auth/Acl.scala +++ b/core/src/main/scala/kafka/security/auth/Acl.scala @@ -18,6 +18,7 @@ package kafka.security.auth import kafka.utils.Json +import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.SecurityUtils @@ -26,7 +27,7 @@ import scala.collection.JavaConverters._ object Acl { val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*") val WildCardHost: String = "*" - val WildCardResource: String = org.apache.kafka.common.resource.Resource.WILDCARD_RESOURCE + val WildCardResource: String = ResourcePattern.WILDCARD_RESOURCE val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All) val PrincipalKey = "principal" val PermissionTypeKey = "permissionType" diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala index fa63fccf10f..c9b5727c46c 100644 --- a/core/src/main/scala/kafka/security/auth/Resource.scala +++ b/core/src/main/scala/kafka/security/auth/Resource.scala @@ -16,14 +16,29 @@ */ package kafka.security.auth -import java.util.Objects -import org.apache.kafka.common.resource.{Resource => JResource} +import org.apache.kafka.common.resource.ResourcePattern object Resource { + val Separator = ":" val ClusterResourceName = "kafka-cluster" val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName, Literal) val ProducerIdResourceName = "producer-id" val WildCardResource = "*" + + def fromString(str: String): Resource = { + ResourceNameType.values.find(nameType => str.startsWith(nameType.name)) match { + case Some(nameType) => + str.split(Separator, 3) match { + case Array(_, resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, nameType) + case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str) + } + case _ => + str.split(Separator, 2) match { + case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, Literal) + case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str) + } + } + } } /** @@ -31,13 +46,9 @@ object Resource { * @param resourceType non-null type of resource. * @param name non-null name of the resource, for topic this will be topic name , for group it will be group name. For cluster type * it will be a constant string kafka-cluster. - * @param resourceNameType non-null type of resource name: literal, prefixed, etc. + * @param nameType non-null type of resource name: literal, prefixed, etc. */ -case class Resource(resourceType: ResourceType, name: String, resourceNameType: ResourceNameType) { - - Objects.requireNonNull(resourceType, "resourceType") - Objects.requireNonNull(name, "name") - Objects.requireNonNull(resourceNameType, "resourceNameType") +case class Resource(resourceType: ResourceType, name: String, nameType: ResourceNameType) { /** * Create an instance of this class with the provided parameters. @@ -52,8 +63,12 @@ case class Resource(resourceType: ResourceType, name: String, resourceNameType: this(resourceType, name, Literal) } - def toJava: JResource = { - new JResource(resourceType.toJava, name, resourceNameType.toJava) + def toPattern: ResourcePattern = { + new ResourcePattern(resourceType.toJava, name, nameType.toJava) + } + + override def toString: String = { + nameType + Resource.Separator + resourceType.name + Resource.Separator + name } } diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index c828970cc0d..0cb2fae2fa2 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -27,7 +27,6 @@ import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import kafka.zk.{AclChangeNotificationSequenceZNode, KafkaZkClient, ZkAclStore} -import org.apache.kafka.common.resource.{ResourceFilter, ResourceNameType => JResourceNameType} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{SecurityUtils, Time} @@ -102,8 +101,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { - if (resource.resourceNameType != Literal) { - throw new IllegalArgumentException("Only literal resources are supported. Got: " + resource.resourceNameType) + if (resource.nameType != Literal) { + throw new IllegalArgumentException("Only literal resources are supported. Got: " + resource.nameType) } val principal = session.principal @@ -203,8 +202,6 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } def getMatchingAcls(resourceType: ResourceType, resourceName: String): Set[Acl] = { - val filter = new ResourceFilter(resourceType.toJava, resourceName, JResourceNameType.ANY) - inReadLock(lock) { val wildcard = aclCache.get(Resource(resourceType, Acl.WildCardResource, Literal)) .map(_.acls) @@ -368,7 +365,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { if (rt != 0) rt else { - val rnt = a.resourceNameType compare b.resourceNameType + val rnt = a.nameType compare b.nameType if (rnt != 0) rnt else diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f4b8689f222..6d9e3d115b8 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -52,7 +52,6 @@ import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, A import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _} -import org.apache.kafka.common.resource.{Resource => AdminResource} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{Time, Utils} @@ -63,6 +62,7 @@ import scala.collection._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails +import org.apache.kafka.common.resource.ResourcePattern /** * Logic to handle the various Kafka requests @@ -1920,7 +1920,7 @@ class KafkaApis(val requestChannel: RequestChannel, val filter = describeAclsRequest.filter() val returnedAcls = auth.getAcls.toSeq.flatMap { case (resource, acls) => acls.flatMap { acl => - val fixture = new AclBinding(new AdminResource(resource.resourceType.toJava, resource.name, resource.resourceNameType.toJava), + val fixture = new AclBinding(new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType.toJava), new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava)) if (filter.matches(fixture)) Some(fixture) else None @@ -1994,7 +1994,7 @@ class KafkaApis(val requestChannel: RequestChannel, val filtersWithIndex = filters.zipWithIndex for ((resource, acls) <- aclMap; acl <- acls) { val binding = new AclBinding( - new AdminResource(resource.resourceType.toJava, resource.name, resource.resourceNameType.toJava), + new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType.toJava), new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava)) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 90f71a1e1d2..20e4b83c842 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -1008,7 +1008,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * @param resource resource name */ def createAclChangeNotification(resource: Resource): Unit = { - val store = ZkAclStore(resource.resourceNameType) + val store = ZkAclStore(resource.nameType) val path = store.changeSequenceZNode.createPath val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resource), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 0524b4599c1..61210354dc4 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -487,7 +487,7 @@ object ZkAclStore { } object ResourceZNode { - def path(resource: Resource): String = ZkAclStore(resource.resourceNameType).path(resource.resourceType, resource.name) + def path(resource: Resource): String = ZkAclStore(resource.nameType).path(resource.resourceType, resource.name) def encode(acls: Set[Acl]): Array[Byte] = Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava) def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes), stat.getVersion) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 331a4491e0d..986fa4a366a 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -40,7 +40,7 @@ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.junit.{After, Before, Rule, Test} import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse} -import org.apache.kafka.common.resource.{Resource, ResourceNameType, ResourceType} +import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern, ResourceType} import org.junit.rules.Timeout import org.junit.Assert._ @@ -933,7 +933,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { checkInvalidAlterConfigs(zkClient, servers, client) } - val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL), + val ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) /** diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index b40dab7809c..a3b32335652 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -40,9 +40,9 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _} -import org.apache.kafka.common.resource.{ResourceFilter, ResourceNameType, Resource => AdminResource, ResourceType => AdminResourceType} +import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} -import org.apache.kafka.common.{KafkaException, Node, TopicPartition, requests} +import org.apache.kafka.common.{KafkaException, Node, TopicPartition, acl, requests, resource} import org.junit.Assert._ import org.junit.{After, Assert, Before, Test} @@ -378,12 +378,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def createAclsRequest = new CreateAclsRequest.Builder( Collections.singletonList(new AclCreation(new AclBinding( - new AdminResource(AdminResourceType.TOPIC, "mytopic"), + new ResourcePattern(AdminResourceType.TOPIC, "mytopic", resource.ResourceNameType.LITERAL), new AccessControlEntry(userPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.DENY))))).build() private def deleteAclsRequest = new DeleteAclsRequest.Builder( Collections.singletonList(new AclBindingFilter( - new ResourceFilter(AdminResourceType.TOPIC, null, ResourceNameType.LITERAL), + new ResourcePatternFilter(AdminResourceType.TOPIC, null, resource.ResourceNameType.LITERAL), new AccessControlEntryFilter(userPrincipal.toString, "*", AclOperation.ANY, AclPermissionType.DENY)))).build() private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build() diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index b3572c0665b..46bf722b3f0 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -15,13 +15,13 @@ package kafka.api import java.io.File import java.util -import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Group, Literal, Operation, PermissionType, SimpleAclAuthorizer, Topic, Prefixed, Acl => AuthAcl, Resource => AuthResource} +import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Group, Operation, PermissionType, SimpleAclAuthorizer, Topic, Acl => AuthAcl, Resource => AuthResource} import kafka.server.KafkaConfig import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils} import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions} import org.apache.kafka.common.acl._ import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException} -import org.apache.kafka.common.resource.{Resource, ResourceFilter, ResourceNameType, ResourceType} +import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern, ResourcePatternFilter, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.junit.Assert.assertEquals import org.junit.{After, Assert, Before, Test} @@ -89,19 +89,19 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with closeSasl() } - val anyAcl = new AclBinding(new Resource(ResourceType.TOPIC, "*", ResourceNameType.LITERAL), + val anyAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "*", ResourceNameType.LITERAL), new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW)) - val acl2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), + val acl2 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW)) - val acl3 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL), + val acl3 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) - val fooAcl = new AclBinding(new Resource(ResourceType.TOPIC, "foobar", ResourceNameType.LITERAL), + val fooAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foobar", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) - val prefixAcl = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.PREFIXED), + val prefixAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.PREFIXED), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) - val transactionalIdAcl = new AclBinding(new Resource(ResourceType.TRANSACTIONAL_ID, "transactional_id", ResourceNameType.LITERAL), + val transactionalIdAcl = new AclBinding(new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "transactional_id", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW)) - val groupAcl = new AclBinding(new Resource(ResourceType.GROUP, "*", ResourceNameType.LITERAL), + val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", ResourceNameType.LITERAL), new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW)) @Test @@ -111,7 +111,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with val results = client.createAcls(List(acl2, acl3).asJava) assertEquals(Set(acl2, acl3), results.values.keySet().asScala) results.values.values().asScala.foreach(value => value.get) - val aclUnknown = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL), + val aclUnknown = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW)) val results2 = client.createAcls(List(aclUnknown).asJava) assertEquals(Set(aclUnknown), results2.values.keySet().asScala) @@ -132,9 +132,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with waitForDescribeAcls(client, acl2.toFilter, Set(acl2)) waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set(transactionalIdAcl)) - val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) - val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) - val filterC = new AclBindingFilter(new ResourceFilter(ResourceType.TRANSACTIONAL_ID, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) + val filterA = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) + val filterB = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) + val filterC = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TRANSACTIONAL_ID, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) waitForDescribeAcls(client, filterA, Set(groupAcl)) waitForDescribeAcls(client, filterC, Set(transactionalIdAcl)) @@ -154,13 +154,13 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with client = AdminClient.create(createConfig()) ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) - val allTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY) - val allLiteralTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) - val allPrefixedTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY) - val literalMyTopic2Acls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) - val prefixedMyTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY) - val allMyTopic2Acls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.ANY), AccessControlEntryFilter.ANY) - val allFooTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "foobar", ResourceNameType.ANY), AccessControlEntryFilter.ANY) + val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY) + val allLiteralTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) + val allPrefixedTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY) + val literalMyTopic2Acls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) + val prefixedMyTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY) + val allMyTopic2Acls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.ANY), AccessControlEntryFilter.ANY) + val allFooTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foobar", ResourceNameType.ANY), AccessControlEntryFilter.ANY) assertEquals(Set(anyAcl), getAcls(anyAcl.toFilter)) assertEquals(Set(prefixAcl), getAcls(prefixAcl.toFilter)) @@ -181,9 +181,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with client = AdminClient.create(createConfig()) ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) - val allTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY) - val allLiteralTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) - val allPrefixedTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY) + val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY) + val allLiteralTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) + val allPrefixedTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY) // Delete only ACLs on literal 'mytopic2' topic var deleted = client.deleteAcls(List(acl2.toFilter).asJava).all().get().asScala.toSet @@ -231,11 +231,11 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with client = AdminClient.create(createConfig()) ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) // <-- prefixed exists, but should never be returned. - val allTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY) - val legacyAllTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) - val legacyMyTopic2Acls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) - val legacyAnyTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "*", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) - val legacyFooTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "foobar", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) + val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY) + val legacyAllTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) + val legacyMyTopic2Acls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) + val legacyAnyTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "*", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) + val legacyFooTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foobar", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY) assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(legacyAllTopicAcls)) assertEquals(Set(acl2), getAcls(legacyMyTopic2Acls)) @@ -266,9 +266,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with @Test def testAttemptToCreateInvalidAcls(): Unit = { client = AdminClient.create(createConfig()) - val clusterAcl = new AclBinding(new Resource(ResourceType.CLUSTER, "foobar", ResourceNameType.LITERAL), + val clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "foobar", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) - val emptyResourceNameAcl = new AclBinding(new Resource(ResourceType.TOPIC, "", ResourceNameType.LITERAL), + val emptyResourceNameAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) val results = client.createAcls(List(clusterAcl, emptyResourceNameAcl).asJava, new CreateAclsOptions()) assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.values.keySet().asScala) @@ -336,7 +336,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with private def testAclGet(expectAuth: Boolean): Unit = { TestUtils.waitUntilTrue(() => { - val userAcl = new AclBinding(new Resource(ResourceType.TOPIC, "*", ResourceNameType.LITERAL), + val userAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "*", ResourceNameType.LITERAL), new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW)) val results = client.describeAcls(userAcl.toFilter) if (expectAuth) { diff --git a/core/src/test/scala/kafka/security/auth/ResourceTest.scala b/core/src/test/scala/kafka/security/auth/ResourceTest.scala new file mode 100644 index 00000000000..66049b41629 --- /dev/null +++ b/core/src/test/scala/kafka/security/auth/ResourceTest.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.security.auth + +import kafka.common.KafkaException +import org.junit.Test +import org.junit.Assert._ + +class ResourceTest { + @Test(expected = classOf[KafkaException]) + def shouldThrowTwoPartStringWithUnknownResourceType(): Unit = { + Resource.fromString("Unknown:fred") + } + + @Test + def shouldParseOldTwoPartString(): Unit = { + assertEquals(Resource(Group, "fred", Literal), Resource.fromString("Group:fred")) + assertEquals(Resource(Topic, "t", Literal), Resource.fromString("Topic:t")) + } + + @Test + def shouldParseOldTwoPartWithEmbeddedSeparators(): Unit = { + assertEquals(Resource(Group, ":This:is:a:weird:group:name:", Literal), Resource.fromString("Group::This:is:a:weird:group:name:")) + } + + @Test + def shouldParseThreePartString(): Unit = { + assertEquals(Resource(Group, "fred", Prefixed), Resource.fromString("Prefixed:Group:fred")) + assertEquals(Resource(Topic, "t", Literal), Resource.fromString("Literal:Topic:t")) + } + + @Test + def shouldParseThreePartWithEmbeddedSeparators(): Unit = { + assertEquals(Resource(Group, ":This:is:a:weird:group:name:", Prefixed), Resource.fromString("Prefixed:Group::This:is:a:weird:group:name:")) + assertEquals(Resource(Group, ":This:is:a:weird:group:name:", Literal), Resource.fromString("Literal:Group::This:is:a:weird:group:name:")) + } + + @Test + def shouldRoundTripViaString(): Unit = { + val expected = Resource(Group, "fred", Prefixed) + + val actual = Resource.fromString(expected.toString) + + assertEquals(expected, actual) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 59f543b522a..f56b3b48789 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -24,12 +24,13 @@ import kafka.security.auth._ import kafka.utils.TestUtils import org.apache.kafka.clients.admin.NewPartitions import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} -import org.apache.kafka.common.resource.{ResourceFilter, ResourceNameType, Resource => AdminResource, ResourceType => AdminResourceType} +import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.protocol.types.Struct +import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _} @@ -312,12 +313,12 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.CREATE_ACLS => new CreateAclsRequest.Builder(Collections.singletonList(new AclCreation(new AclBinding( - new AdminResource(AdminResourceType.TOPIC, "mytopic"), + new ResourcePattern(AdminResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY))))) case ApiKeys.DELETE_ACLS => new DeleteAclsRequest.Builder(Collections.singletonList(new AclBindingFilter( - new ResourceFilter(AdminResourceType.TOPIC, null, ResourceNameType.LITERAL), + new ResourcePatternFilter(AdminResourceType.TOPIC, null, ResourceNameType.LITERAL), new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))) case ApiKeys.DESCRIBE_CONFIGS => ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Investigate if its possible to drop the ResourceNameType field from Java > Resource class. > ---------------------------------------------------------------------------------------- > > Key: KAFKA-7011 > URL: https://issues.apache.org/jira/browse/KAFKA-7011 > Project: Kafka > Issue Type: Sub-task > Components: core, security > Reporter: Andy Coates > Assignee: Andy Coates > Priority: Major > Fix For: 2.0.0 > > > Following on from the PR [#5117|https://github.com/apache/kafka/pull/5117] > and discussions with Colin McCabe... > > Current placement of ResourceNameType as field in Resource class is ... less > than ideal. A Resource should be a concrete resource. Look to resolve this. > > Thoughts... > A. I guess you could subclass Resource and have ResourcePrefix - but there is > no 'is-a' relationship here and it would still allow > authorise(ResourcePrefix()) > B. You could move ResourceNameType into AccessControllEntryData - possible. > C. Move ResourceNameType directly into AclBinding / AclBindingFilter - > possible > -- This message was sent by Atlassian JIRA (v7.6.3#76005)