dajac commented on code in PR #18974:
URL: https://github.com/apache/kafka/pull/18974#discussion_r1963276056


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2592,11 +2601,23 @@ class KafkaApis(val requestChannel: RequestChannel,
             response.groups.addAll(results)
           }
 
+          // Remove the unauthorized topics from the member assignments and 
target assignments.

Review Comment:
   ditto about the comment.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2536,6 +2536,15 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (exception != null) {
           requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(exception))
         } else {
+          if (response.assignment != null) {
+            // Remove the unauthorized topics from the assignment.
+            val authorizedForDescribeTopics = 
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
+              response.assignment.topicPartitions.asScala.flatMap(tp => 
metadataCache.getTopicName(tp.topicId)))(identity)
+            
response.assignment.setTopicPartitions(response.assignment.topicPartitions.asScala.filter(tp
 => {

Review Comment:
   nit: `.filter { tp =>`.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2592,11 +2601,23 @@ class KafkaApis(val requestChannel: RequestChannel,
             response.groups.addAll(results)
           }
 
+          // Remove the unauthorized topics from the member assignments and 
target assignments.
+          val topicsToCheck = 
response.groups.asScala.flatMap(_.members.asScala).flatMap { member =>
+            member.assignment.topicPartitions.asScala.map(_.topicName) ++
+              member.targetAssignment.topicPartitions.asScala.map(_.topicName)
+          }.toSet

Review Comment:
   For this one, I am tempted by using a mutable Set. With it, we could 
basically iterate over all the groups, members and topics to populate it. It 
would avoid all the extract collections that we create here.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -9998,6 +10061,82 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.FENCED_MEMBER_EPOCH.code, 
response.data.groups.get(0).errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.addAll(groupIds)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    def buildExpectedActions(topics: List[String]): util.List[Action] = {
+      topics.map { topic =>
+        val pattern = new ResourcePattern(ResourceType.TOPIC, topic, 
PatternType.LITERAL)
+        new Action(AclOperation.DESCRIBE, pattern, 1, true, true)
+      }.asJava
+    }
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActions(List(fooTopicName, barTopicName)))))
+      .thenReturn(List(AuthorizationResult.ALLOWED, 
AuthorizationResult.DENIED).asJava)
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActions(List(barTopicName, fooTopicName)))))
+      .thenReturn(List(AuthorizationResult.DENIED, 
AuthorizationResult.ALLOWED).asJava)
+
+    val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.consumerGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(
+      authorizer = Some(authorizer),
+      featureVersions = Seq(GroupVersion.GV_1)
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val member0 = new 
ConsumerGroupDescribeResponseData.Member().setMemberId("member0")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(new 
TopicPartitions().setTopicName(fooTopicName)).asJava))
+    val expectedMember0 = member0;
+
+    val member1 = new 
ConsumerGroupDescribeResponseData.Member().setMemberId("member1")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(new 
TopicPartitions().setTopicName(fooTopicName),
+          new TopicPartitions().setTopicName(barTopicName)).asJava))
+    val expectedMember1 = new 
ConsumerGroupDescribeResponseData.Member().setMemberId("member1")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(new 
TopicPartitions().setTopicName(fooTopicName)).asJava))
+
+    val member2 = new 
ConsumerGroupDescribeResponseData.Member().setMemberId("member2")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(new 
TopicPartitions().setTopicName(barTopicName)).asJava))
+    val expectedMember2 = new 
ConsumerGroupDescribeResponseData.Member().setMemberId("member2")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List.empty.asJava))

Review Comment:
   nit: It is empty by default. No need to set it.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2592,11 +2601,23 @@ class KafkaApis(val requestChannel: RequestChannel,
             response.groups.addAll(results)
           }
 
+          // Remove the unauthorized topics from the member assignments and 
target assignments.
+          val topicsToCheck = 
response.groups.asScala.flatMap(_.members.asScala).flatMap { member =>
+            member.assignment.topicPartitions.asScala.map(_.topicName) ++
+              member.targetAssignment.topicPartitions.asScala.map(_.topicName)
+          }.toSet
+          val authorizedTopics = 
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
+            topicsToCheck)(identity)
+          response.groups.forEach(_.members.forEach { member => {
+            List(member.assignment, member.targetAssignment).foreach { 
assignment =>
+              
assignment.setTopicPartitions(assignment.topicPartitions.asScala.filter(tp =>
+                authorizedTopics.contains(tp.topicName)).asJava)}
+          }})

Review Comment:
   nit: The format looks weird. 
   
   ```
             response.groups.forEach(_.members.forEach { member =>
               List(member.assignment, member.targetAssignment).foreach { 
assignment =>
                 
assignment.setTopicPartitions(assignment.topicPartitions.asScala.filter { tp =>
                   authorizedTopics.contains(tp.topicName)
                 }.asJava)
               }
             })
   ```



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2536,6 +2536,15 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (exception != null) {
           requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(exception))
         } else {
+          if (response.assignment != null) {
+            // Remove the unauthorized topics from the assignment.

Review Comment:
   nit: `Clients are not allowed to see topics that are not authorized for 
Describe.`?
   



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -9998,6 +10061,82 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.FENCED_MEMBER_EPOCH.code, 
response.data.groups.get(0).errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.addAll(groupIds)

Review Comment:
   nit: 
   ```
   new ConsumerGroupDescribeRequestData()
       .setGroupIds(groupIds)
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -9871,6 +9871,69 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupHeartbeatFilterUnauthorizedTopics(): Unit = {
+    val memberId = Uuid.randomUuid.toString
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+    val fooTopicId = Uuid.randomUuid
+    val barTopicId = Uuid.randomUuid
+    val zarTopicId = Uuid.randomUuid
+
+    metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.getTopicName(fooTopicId)).thenReturn(Some(fooTopicName))
+    when(metadataCache.getTopicName(barTopicId)).thenReturn(Some(barTopicName))
+    when(metadataCache.getTopicName(zarTopicId)).thenReturn(None)
+
+    val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+
+    val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]()
+    when(groupCoordinator.consumerGroupHeartbeat(
+      requestChannelRequest.context,
+      consumerGroupHeartbeatRequest
+    )).thenReturn(future)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    def buildExpectedActions(topics: List[String]): util.List[Action] = {
+      topics.map { topic =>
+        val pattern = new ResourcePattern(ResourceType.TOPIC, topic, 
PatternType.LITERAL)
+        new Action(AclOperation.DESCRIBE, pattern, 1, true, true)
+      }.asJava
+    }
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActions(List(fooTopicName, barTopicName)))))
+      .thenReturn(List(AuthorizationResult.ALLOWED, 
AuthorizationResult.DENIED).asJava)
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActions(List(barTopicName, fooTopicName)))))
+      .thenReturn(List(AuthorizationResult.DENIED, 
AuthorizationResult.ALLOWED).asJava)

Review Comment:
   I recall using a nice pattern to do this. Take a look into 
`testHandleOffsetFetchAuthorization`. It would avoid having to specify all the 
combinations.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -9998,6 +10061,82 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.FENCED_MEMBER_EPOCH.code, 
response.data.groups.get(0).errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.addAll(groupIds)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    def buildExpectedActions(topics: List[String]): util.List[Action] = {
+      topics.map { topic =>
+        val pattern = new ResourcePattern(ResourceType.TOPIC, topic, 
PatternType.LITERAL)
+        new Action(AclOperation.DESCRIBE, pattern, 1, true, true)
+      }.asJava
+    }
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActions(List(fooTopicName, barTopicName)))))
+      .thenReturn(List(AuthorizationResult.ALLOWED, 
AuthorizationResult.DENIED).asJava)
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActions(List(barTopicName, fooTopicName)))))
+      .thenReturn(List(AuthorizationResult.DENIED, 
AuthorizationResult.ALLOWED).asJava)

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -9998,6 +10061,82 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.FENCED_MEMBER_EPOCH.code, 
response.data.groups.get(0).errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.addAll(groupIds)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    def buildExpectedActions(topics: List[String]): util.List[Action] = {
+      topics.map { topic =>
+        val pattern = new ResourcePattern(ResourceType.TOPIC, topic, 
PatternType.LITERAL)
+        new Action(AclOperation.DESCRIBE, pattern, 1, true, true)
+      }.asJava
+    }
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActions(List(fooTopicName, barTopicName)))))
+      .thenReturn(List(AuthorizationResult.ALLOWED, 
AuthorizationResult.DENIED).asJava)
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActions(List(barTopicName, fooTopicName)))))
+      .thenReturn(List(AuthorizationResult.DENIED, 
AuthorizationResult.ALLOWED).asJava)
+
+    val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.consumerGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(
+      authorizer = Some(authorizer),
+      featureVersions = Seq(GroupVersion.GV_1)
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val member0 = new 
ConsumerGroupDescribeResponseData.Member().setMemberId("member0")

Review Comment:
   nit: `setMemberId` should be on next line. This comment applies to all the 
members.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -9998,6 +10061,82 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.FENCED_MEMBER_EPOCH.code, 
response.data.groups.get(0).errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.addAll(groupIds)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    def buildExpectedActions(topics: List[String]): util.List[Action] = {
+      topics.map { topic =>
+        val pattern = new ResourcePattern(ResourceType.TOPIC, topic, 
PatternType.LITERAL)
+        new Action(AclOperation.DESCRIBE, pattern, 1, true, true)
+      }.asJava
+    }
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActions(List(fooTopicName, barTopicName)))))
+      .thenReturn(List(AuthorizationResult.ALLOWED, 
AuthorizationResult.DENIED).asJava)
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(buildExpectedActions(List(barTopicName, fooTopicName)))))
+      .thenReturn(List(AuthorizationResult.DENIED, 
AuthorizationResult.ALLOWED).asJava)
+
+    val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.consumerGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(
+      authorizer = Some(authorizer),
+      featureVersions = Seq(GroupVersion.GV_1)
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val member0 = new 
ConsumerGroupDescribeResponseData.Member().setMemberId("member0")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(new 
TopicPartitions().setTopicName(fooTopicName)).asJava))
+    val expectedMember0 = member0;
+
+    val member1 = new 
ConsumerGroupDescribeResponseData.Member().setMemberId("member1")
+      .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+        .setTopicPartitions(List(new 
TopicPartitions().setTopicName(fooTopicName),
+          new TopicPartitions().setTopicName(barTopicName)).asJava))

Review Comment:
   nit:
   
   ```
   .setTopicPartitions(List(
       new TopicPartitions().setTopicName(fooTopicName),
       new TopicPartitions().setTopicName(barTopicName)).asJava))
   ```



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2536,6 +2536,15 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (exception != null) {
           requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(exception))
         } else {
+          if (response.assignment != null) {
+            // Remove the unauthorized topics from the assignment.
+            val authorizedForDescribeTopics = 
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
+              response.assignment.topicPartitions.asScala.flatMap(tp => 
metadataCache.getTopicName(tp.topicId)))(identity)
+            
response.assignment.setTopicPartitions(response.assignment.topicPartitions.asScala.filter(tp
 => {
+              val topicName = metadataCache.getTopicName(tp.topicId)
+              !topicName.isEmpty && 
authorizedForDescribeTopics.contains(topicName.get)

Review Comment:
   nit: Could use use `topicName.nonEmpty`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to