dajac commented on code in PR #18974: URL: https://github.com/apache/kafka/pull/18974#discussion_r1963276056
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2592,11 +2601,23 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Remove the unauthorized topics from the member assignments and target assignments. Review Comment: ditto about the comment. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2536,6 +2536,15 @@ class KafkaApis(val requestChannel: RequestChannel, if (exception != null) { requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(exception)) } else { + if (response.assignment != null) { + // Remove the unauthorized topics from the assignment. + val authorizedForDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + response.assignment.topicPartitions.asScala.flatMap(tp => metadataCache.getTopicName(tp.topicId)))(identity) + response.assignment.setTopicPartitions(response.assignment.topicPartitions.asScala.filter(tp => { Review Comment: nit: `.filter { tp =>`. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2592,11 +2601,23 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Remove the unauthorized topics from the member assignments and target assignments. + val topicsToCheck = response.groups.asScala.flatMap(_.members.asScala).flatMap { member => + member.assignment.topicPartitions.asScala.map(_.topicName) ++ + member.targetAssignment.topicPartitions.asScala.map(_.topicName) + }.toSet Review Comment: For this one, I am tempted by using a mutable Set. With it, we could basically iterate over all the groups, members and topics to populate it. It would avoid all the extract collections that we create here. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -9998,6 +10061,82 @@ class KafkaApisTest extends Logging { assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode) } + @Test + def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = { + val fooTopicName = "foo" + val barTopicName = "bar" + + metadataCache = mock(classOf[KRaftMetadataCache]) + + val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.addAll(groupIds) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + def buildExpectedActions(topics: List[String]): util.List[Action] = { + topics.map { topic => + val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + new Action(AclOperation.DESCRIBE, pattern, 1, true, true) + }.asJava + } + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) + .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(fooTopicName, barTopicName))))) + .thenReturn(List(AuthorizationResult.ALLOWED, AuthorizationResult.DENIED).asJava) + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(barTopicName, fooTopicName))))) + .thenReturn(List(AuthorizationResult.DENIED, AuthorizationResult.ALLOWED).asJava) + + val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.consumerGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + kafkaApis = createKafkaApis( + authorizer = Some(authorizer), + featureVersions = Seq(GroupVersion.GV_1) + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val member0 = new ConsumerGroupDescribeResponseData.Member().setMemberId("member0") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List(new TopicPartitions().setTopicName(fooTopicName)).asJava)) + val expectedMember0 = member0; + + val member1 = new ConsumerGroupDescribeResponseData.Member().setMemberId("member1") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List(new TopicPartitions().setTopicName(fooTopicName), + new TopicPartitions().setTopicName(barTopicName)).asJava)) + val expectedMember1 = new ConsumerGroupDescribeResponseData.Member().setMemberId("member1") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List(new TopicPartitions().setTopicName(fooTopicName)).asJava)) + + val member2 = new ConsumerGroupDescribeResponseData.Member().setMemberId("member2") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List(new TopicPartitions().setTopicName(barTopicName)).asJava)) + val expectedMember2 = new ConsumerGroupDescribeResponseData.Member().setMemberId("member2") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List.empty.asJava)) Review Comment: nit: It is empty by default. No need to set it. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2592,11 +2601,23 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Remove the unauthorized topics from the member assignments and target assignments. + val topicsToCheck = response.groups.asScala.flatMap(_.members.asScala).flatMap { member => + member.assignment.topicPartitions.asScala.map(_.topicName) ++ + member.targetAssignment.topicPartitions.asScala.map(_.topicName) + }.toSet + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + topicsToCheck)(identity) + response.groups.forEach(_.members.forEach { member => { + List(member.assignment, member.targetAssignment).foreach { assignment => + assignment.setTopicPartitions(assignment.topicPartitions.asScala.filter(tp => + authorizedTopics.contains(tp.topicName)).asJava)} + }}) Review Comment: nit: The format looks weird. ``` response.groups.forEach(_.members.forEach { member => List(member.assignment, member.targetAssignment).foreach { assignment => assignment.setTopicPartitions(assignment.topicPartitions.asScala.filter { tp => authorizedTopics.contains(tp.topicName) }.asJava) } }) ``` ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2536,6 +2536,15 @@ class KafkaApis(val requestChannel: RequestChannel, if (exception != null) { requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(exception)) } else { + if (response.assignment != null) { + // Remove the unauthorized topics from the assignment. Review Comment: nit: `Clients are not allowed to see topics that are not authorized for Describe.`? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -9998,6 +10061,82 @@ class KafkaApisTest extends Logging { assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode) } + @Test + def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = { + val fooTopicName = "foo" + val barTopicName = "bar" + + metadataCache = mock(classOf[KRaftMetadataCache]) + + val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.addAll(groupIds) Review Comment: nit: ``` new ConsumerGroupDescribeRequestData() .setGroupIds(groupIds) ``` ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -9871,6 +9871,69 @@ class KafkaApisTest extends Logging { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testConsumerGroupHeartbeatFilterUnauthorizedTopics(): Unit = { + val memberId = Uuid.randomUuid.toString + val fooTopicName = "foo" + val barTopicName = "bar" + val fooTopicId = Uuid.randomUuid + val barTopicId = Uuid.randomUuid + val zarTopicId = Uuid.randomUuid + + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.getTopicName(fooTopicId)).thenReturn(Some(fooTopicName)) + when(metadataCache.getTopicName(barTopicId)).thenReturn(Some(barTopicName)) + when(metadataCache.getTopicName(zarTopicId)).thenReturn(None) + + val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build()) + + val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]() + when(groupCoordinator.consumerGroupHeartbeat( + requestChannelRequest.context, + consumerGroupHeartbeatRequest + )).thenReturn(future) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + def buildExpectedActions(topics: List[String]): util.List[Action] = { + topics.map { topic => + val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + new Action(AclOperation.DESCRIBE, pattern, 1, true, true) + }.asJava + } + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(fooTopicName, barTopicName))))) + .thenReturn(List(AuthorizationResult.ALLOWED, AuthorizationResult.DENIED).asJava) + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(barTopicName, fooTopicName))))) + .thenReturn(List(AuthorizationResult.DENIED, AuthorizationResult.ALLOWED).asJava) Review Comment: I recall using a nice pattern to do this. Take a look into `testHandleOffsetFetchAuthorization`. It would avoid having to specify all the combinations. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -9998,6 +10061,82 @@ class KafkaApisTest extends Logging { assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode) } + @Test + def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = { + val fooTopicName = "foo" + val barTopicName = "bar" + + metadataCache = mock(classOf[KRaftMetadataCache]) + + val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.addAll(groupIds) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + def buildExpectedActions(topics: List[String]): util.List[Action] = { + topics.map { topic => + val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + new Action(AclOperation.DESCRIBE, pattern, 1, true, true) + }.asJava + } + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) + .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(fooTopicName, barTopicName))))) + .thenReturn(List(AuthorizationResult.ALLOWED, AuthorizationResult.DENIED).asJava) + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(barTopicName, fooTopicName))))) + .thenReturn(List(AuthorizationResult.DENIED, AuthorizationResult.ALLOWED).asJava) Review Comment: ditto. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -9998,6 +10061,82 @@ class KafkaApisTest extends Logging { assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode) } + @Test + def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = { + val fooTopicName = "foo" + val barTopicName = "bar" + + metadataCache = mock(classOf[KRaftMetadataCache]) + + val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.addAll(groupIds) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + def buildExpectedActions(topics: List[String]): util.List[Action] = { + topics.map { topic => + val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + new Action(AclOperation.DESCRIBE, pattern, 1, true, true) + }.asJava + } + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) + .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(fooTopicName, barTopicName))))) + .thenReturn(List(AuthorizationResult.ALLOWED, AuthorizationResult.DENIED).asJava) + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(barTopicName, fooTopicName))))) + .thenReturn(List(AuthorizationResult.DENIED, AuthorizationResult.ALLOWED).asJava) + + val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.consumerGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + kafkaApis = createKafkaApis( + authorizer = Some(authorizer), + featureVersions = Seq(GroupVersion.GV_1) + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val member0 = new ConsumerGroupDescribeResponseData.Member().setMemberId("member0") Review Comment: nit: `setMemberId` should be on next line. This comment applies to all the members. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -9998,6 +10061,82 @@ class KafkaApisTest extends Logging { assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode) } + @Test + def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = { + val fooTopicName = "foo" + val barTopicName = "bar" + + metadataCache = mock(classOf[KRaftMetadataCache]) + + val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.addAll(groupIds) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + def buildExpectedActions(topics: List[String]): util.List[Action] = { + topics.map { topic => + val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + new Action(AclOperation.DESCRIBE, pattern, 1, true, true) + }.asJava + } + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) + .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(fooTopicName, barTopicName))))) + .thenReturn(List(AuthorizationResult.ALLOWED, AuthorizationResult.DENIED).asJava) + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(barTopicName, fooTopicName))))) + .thenReturn(List(AuthorizationResult.DENIED, AuthorizationResult.ALLOWED).asJava) + + val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.consumerGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + kafkaApis = createKafkaApis( + authorizer = Some(authorizer), + featureVersions = Seq(GroupVersion.GV_1) + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val member0 = new ConsumerGroupDescribeResponseData.Member().setMemberId("member0") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List(new TopicPartitions().setTopicName(fooTopicName)).asJava)) + val expectedMember0 = member0; + + val member1 = new ConsumerGroupDescribeResponseData.Member().setMemberId("member1") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List(new TopicPartitions().setTopicName(fooTopicName), + new TopicPartitions().setTopicName(barTopicName)).asJava)) Review Comment: nit: ``` .setTopicPartitions(List( new TopicPartitions().setTopicName(fooTopicName), new TopicPartitions().setTopicName(barTopicName)).asJava)) ``` ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2536,6 +2536,15 @@ class KafkaApis(val requestChannel: RequestChannel, if (exception != null) { requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(exception)) } else { + if (response.assignment != null) { + // Remove the unauthorized topics from the assignment. + val authorizedForDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + response.assignment.topicPartitions.asScala.flatMap(tp => metadataCache.getTopicName(tp.topicId)))(identity) + response.assignment.setTopicPartitions(response.assignment.topicPartitions.asScala.filter(tp => { + val topicName = metadataCache.getTopicName(tp.topicId) + !topicName.isEmpty && authorizedForDescribeTopics.contains(topicName.get) Review Comment: nit: Could use use `topicName.nonEmpty`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org