lucasbru commented on code in PR #19183:
URL: https://github.com/apache/kafka/pull/19183#discussion_r1995234159


##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json:
##########
@@ -30,11 +30,12 @@
   // - FENCED_MEMBER_EPOCH (version 0+)
   // - UNRELEASED_INSTANCE_ID (version 0+)
   // - GROUP_MAX_SIZE_REACHED (version 0+)
-  // - TOPIC_AUTHORIZATION_FAILED (version 0+) 
+  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
   // - CLUSTER_AUTHORIZATION_FAILED (version 0+)
   // - STREAMS_INVALID_TOPOLOGY (version 0+)
   // - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+)
   // - STREAMS_TOPOLOGY_FENCED (version 0+)
+  // - TOPIC_AUTHORIZATION_FAILED (version 0+)

Review Comment:
   duplicate



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java:
##########
@@ -238,7 +238,9 @@ private void handleError(
             Set<CoordinatorKey> groupsToUnmap) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
+            case TOPIC_AUTHORIZATION_FAILED:
                 log.debug("`DescribeStreamsGroups` request for group id {} 
failed due to error {}", groupId.idValue, error);
+                // The topic auth response received on DescribeStreamsGroup is 
a generic one not including topic names, so we just pass it on unchanged here.

Review Comment:
   what do you mean by this comment? Also, this block also handles 
`GROUP_AUTHORIZATION_FAILED`, so it's weird that you're just talking about 
`topic auth`.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2785,6 +2794,46 @@ class KafkaApis(val requestChannel: RequestChannel,
             response.groups.addAll(results)
           }
 
+          // Clients are not allowed to see topics that are not authorized for 
Describe.
+          if (authorizer.isDefined) {
+            val topicsToCheck = response.groups.stream()
+              .flatMap(group => group.topology.subtopologies.stream)
+              .flatMap(subtopology => util.stream.Stream.of(
+                subtopology.sourceTopics,
+                subtopology.repartitionSinkTopics,
+                
subtopology.repartitionSourceTopics.iterator.asScala.map(_.name).toList.asJava,
+                
subtopology.stateChangelogTopics.iterator.asScala.map(_.name).toList.asJava))
+              .flatMap(_.stream)
+              .collect(Collectors.toSet[String])
+              .asScala
+
+            val authorizedTopics = 
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
+              topicsToCheck)(identity)
+
+            val updatedGroups = response.groups.stream.map { group =>
+              val hasUnauthorizedTopic = group.topology.subtopologies.stream()
+                .flatMap(subtopology => util.stream.Stream.of(
+                  subtopology.sourceTopics,
+                  subtopology.repartitionSinkTopics,
+                  
subtopology.repartitionSourceTopics.iterator.asScala.map(_.name).toList.asJava,
+                  
subtopology.stateChangelogTopics.iterator.asScala.map(_.name).toList.asJava))
+                .flatMap(_.stream)
+                .anyMatch(topic => !authorizedTopics.contains(topic))
+
+              if (hasUnauthorizedTopic) {
+                new StreamsGroupDescribeResponseData.DescribedGroup()
+                  .setGroupId(group.groupId)
+                  .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+                  .setErrorMessage("The group has described topic(s) that the 
client is not authorized to describe.")

Review Comment:
   "The described group uses topics that the client is not authorized to 
describe."



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10350,10 +10400,32 @@ class KafkaApisTest extends Logging {
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
+    val subtotplogy0 = new StreamsGroupDescribeResponseData.Subtopology()

Review Comment:
   nit: `subtotplogy` -> `subtopology`. Same in other places



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10006,6 +10006,54 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
+    val groupId = "group"
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+    val zarTopicName = "zar"
+
+    val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId(groupId).setTopology(
+      new StreamsGroupHeartbeatRequestData.Topology()
+        .setEpoch(3)
+        .setSubtopologies(
+          Collections.singletonList(new 
StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology")
+            .setSourceTopics(Collections.singletonList(fooTopicName))
+            .setRepartitionSinkTopics(Collections.singletonList(barTopicName))
+            .setRepartitionSourceTopics(Collections.singletonList(new 
StreamsGroupHeartbeatRequestData.TopicInfo().setName(zarTopicName)))

Review Comment:
   We should include a changelog topic here as well.



##########
clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java:
##########
@@ -43,6 +43,7 @@
  * - {@link Errors#STREAMS_INVALID_TOPOLOGY}
  * - {@link Errors#STREAMS_INVALID_TOPOLOGY_EPOCH}
  * - {@link Errors#STREAMS_TOPOLOGY_FENCED}
+ * - {@link Errors#TOPIC_AUTHORIZATION_FAILED}

Review Comment:
   duplicate



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2785,6 +2794,46 @@ class KafkaApis(val requestChannel: RequestChannel,
             response.groups.addAll(results)
           }
 
+          // Clients are not allowed to see topics that are not authorized for 
Describe.
+          if (authorizer.isDefined) {
+            val topicsToCheck = response.groups.stream()
+              .flatMap(group => group.topology.subtopologies.stream)
+              .flatMap(subtopology => util.stream.Stream.of(
+                subtopology.sourceTopics,
+                subtopology.repartitionSinkTopics,
+                
subtopology.repartitionSourceTopics.iterator.asScala.map(_.name).toList.asJava,

Review Comment:
   Couldn't you use `.stream().map(_name)` instead of converting between scala 
and java? It seems the extra collection is unncessary here.
   
   Something like
   
   ```
   Stream.concat(
                   subtopology.sourceTopics.stream,
                   subtopology.repartitionSinkTopics.stream,
                   subtopology.repartitionSourceTopics.stream.map(_.name),
                                ....
   )
   ```



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2785,6 +2794,46 @@ class KafkaApis(val requestChannel: RequestChannel,
             response.groups.addAll(results)
           }
 
+          // Clients are not allowed to see topics that are not authorized for 
Describe.
+          if (authorizer.isDefined) {
+            val topicsToCheck = response.groups.stream()
+              .flatMap(group => group.topology.subtopologies.stream)
+              .flatMap(subtopology => util.stream.Stream.of(
+                subtopology.sourceTopics,
+                subtopology.repartitionSinkTopics,
+                
subtopology.repartitionSourceTopics.iterator.asScala.map(_.name).toList.asJava,
+                
subtopology.stateChangelogTopics.iterator.asScala.map(_.name).toList.asJava))
+              .flatMap(_.stream)
+              .collect(Collectors.toSet[String])
+              .asScala
+
+            val authorizedTopics = 
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
+              topicsToCheck)(identity)
+
+            val updatedGroups = response.groups.stream.map { group =>
+              val hasUnauthorizedTopic = group.topology.subtopologies.stream()
+                .flatMap(subtopology => util.stream.Stream.of(
+                  subtopology.sourceTopics,

Review Comment:
   Can we avoid creating intermediate collections? Either by using 
`Stream.concat`, or by just calling `anyMatch` on each of the collections and 
combining the boolean results using `||` for individual collections.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10006,6 +10006,54 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
+    val groupId = "group"
+    val fooTopicName = "foo"
+    val barTopicName = "bar"
+    val zarTopicName = "zar"
+
+    val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId(groupId).setTopology(
+      new StreamsGroupHeartbeatRequestData.Topology()
+        .setEpoch(3)
+        .setSubtopologies(
+          Collections.singletonList(new 
StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology")

Review Comment:
   nit: `Collections.singletonList` -> `List.of`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to