AndrewJSchofield commented on code in PR #19540: URL: https://github.com/apache/kafka/pull/19540#discussion_r2063830503
########## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ########## @@ -2609,6 +2777,475 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(0), fullRequest = true) } + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithGroupReadAndTopicDescribeAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithoutGroupReadOrTopicDescribeAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithoutGroupReadAcl(quorum: String): Unit = { + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithoutTopicDescribeAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + private def createShareGroupToDescribe(): Unit = { + createTopicWithBrokerPrincipal(topic) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), shareGroupResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) + shareConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup) + val consumer = createShareConsumer() + consumer.subscribe(Collections.singleton(topic)) + consumer.poll(Duration.ofMillis(500L)) + removeAllClientAcls() + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupDescribeWithGroupDescribeAndTopicDescribeAcl(quorum: String): Unit = { + createShareGroupToDescribe() + addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = shareGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupDescribeWithOperationAll(quorum: String): Unit = { + createShareGroupToDescribe() + + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = shareGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupDescribeWithoutGroupDescribeAcl(quorum: String): Unit = { + createShareGroupToDescribe() + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = shareGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(quorum: String): Unit = { + createShareGroupToDescribe() + + val request = shareGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithGroupReadAndTopicReadAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicReadAcl(topicResource), topicResource) + + val request = createShareFetchRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = createShareFetchRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithoutGroupReadOrTopicReadAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = createShareFetchRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithoutGroupReadAcl(quorum: String): Unit = { + addAndVerifyAcls(topicReadAcl(topicResource), topicResource) + + val request = createShareFetchRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithoutTopicReadeAcl(quorum: String): Unit = { + createTopicWithBrokerPrincipal(topic) + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + + val request = createShareFetchRequest + val response = connectAndReceive[ShareFetchResponse](request, listenerName = listenerName) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.data.responses.get(0).partitions.get(0).errorCode)) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareAcknowledgeWithGroupReadAndTopicReadAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicReadAcl(topicResource), topicResource) + + val request = shareAcknowledgeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareAcknowledgeWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = shareAcknowledgeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareAcknowledgeWithoutGroupReadOrTopicReadAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = shareAcknowledgeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareAcknowledgeFetchWithoutGroupReadAcl(quorum: String): Unit = { + addAndVerifyAcls(topicReadAcl(topicResource), topicResource) + + val request = shareAcknowledgeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testInitializeShareGroupStateWithClusterAcl(quorum: String): Unit = { + addAndVerifyAcls(clusterAcl(clusterResource), clusterResource) + + val request = initializeShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testInitializeShareGroupStateWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource) + + val request = initializeShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testInitializeShareGroupStateWithoutClusterAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = initializeShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testReadShareGroupStateWithClusterAcl(quorum: String): Unit = { + addAndVerifyAcls(clusterAcl(clusterResource), clusterResource) + + val request = readShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testReadShareGroupStateWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource) + + val request = readShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testReadShareGroupStateWithoutClusterAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = readShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testWriteShareGroupStateWithClusterAcl(quorum: String): Unit = { + addAndVerifyAcls(clusterAcl(clusterResource), clusterResource) + + val request = writeShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testWriteShareGroupStateWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource) + + val request = writeShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testWriteShareGroupStateWithoutClusterAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = writeShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDeleteShareGroupStateWithClusterAcl(quorum: String): Unit = { + addAndVerifyAcls(clusterAcl(clusterResource), clusterResource) + + val request = deleteShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDeleteShareGroupStateWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource) + + val request = deleteShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testDeleteShareGroupStateWithoutClusterAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = deleteShareGroupStateRequest + val resource = Set[ResourceType](CLUSTER) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testReadShareGroupStateSummaryWithClusterAcl(quorum: String): Unit = { + addAndVerifyAcls(clusterAcl(clusterResource), clusterResource) + + val request = readShareGroupStateRequest Review Comment: `readShareGroupStateSummary` is different than `readShareGroupState`. ########## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ########## @@ -2609,6 +2777,475 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(0), fullRequest = true) } + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithGroupReadAndTopicDescribeAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithoutGroupReadOrTopicDescribeAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithoutGroupReadAcl(quorum: String): Unit = { + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupHeartbeatWithoutTopicDescribeAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + + val request = shareGroupHeartbeatRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + private def createShareGroupToDescribe(): Unit = { + createTopicWithBrokerPrincipal(topic) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), shareGroupResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) + shareConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup) + val consumer = createShareConsumer() + consumer.subscribe(Collections.singleton(topic)) + consumer.poll(Duration.ofMillis(500L)) + removeAllClientAcls() + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupDescribeWithGroupDescribeAndTopicDescribeAcl(quorum: String): Unit = { + createShareGroupToDescribe() + addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = shareGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupDescribeWithOperationAll(quorum: String): Unit = { + createShareGroupToDescribe() + + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = shareGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupDescribeWithoutGroupDescribeAcl(quorum: String): Unit = { + createShareGroupToDescribe() + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = shareGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(quorum: String): Unit = { + createShareGroupToDescribe() + + val request = shareGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithGroupReadAndTopicReadAcl(quorum: String): Unit = { + addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource) + addAndVerifyAcls(topicReadAcl(topicResource), topicResource) + + val request = createShareFetchRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val request = createShareFetchRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithoutGroupReadOrTopicReadAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = createShareFetchRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithoutGroupReadAcl(quorum: String): Unit = { + addAndVerifyAcls(topicReadAcl(topicResource), topicResource) + + val request = createShareFetchRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kip932")) + def testShareFetchWithoutTopicReadeAcl(quorum: String): Unit = { Review Comment: nit: `ReadAcl` not `ReadeAcl`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org