lucasbru commented on code in PR #19183: URL: https://github.com/apache/kafka/pull/19183#discussion_r1995234159
########## clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json: ########## @@ -30,11 +30,12 @@ // - FENCED_MEMBER_EPOCH (version 0+) // - UNRELEASED_INSTANCE_ID (version 0+) // - GROUP_MAX_SIZE_REACHED (version 0+) - // - TOPIC_AUTHORIZATION_FAILED (version 0+) + // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - CLUSTER_AUTHORIZATION_FAILED (version 0+) // - STREAMS_INVALID_TOPOLOGY (version 0+) // - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+) // - STREAMS_TOPOLOGY_FENCED (version 0+) + // - TOPIC_AUTHORIZATION_FAILED (version 0+) Review Comment: duplicate ########## clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java: ########## @@ -238,7 +238,9 @@ private void handleError( Set<CoordinatorKey> groupsToUnmap) { switch (error) { case GROUP_AUTHORIZATION_FAILED: + case TOPIC_AUTHORIZATION_FAILED: log.debug("`DescribeStreamsGroups` request for group id {} failed due to error {}", groupId.idValue, error); + // The topic auth response received on DescribeStreamsGroup is a generic one not including topic names, so we just pass it on unchanged here. Review Comment: what do you mean by this comment? Also, this block also handles `GROUP_AUTHORIZATION_FAILED`, so it's weird that you're just talking about `topic auth`. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2785,6 +2794,46 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + if (authorizer.isDefined) { + val topicsToCheck = response.groups.stream() + .flatMap(group => group.topology.subtopologies.stream) + .flatMap(subtopology => util.stream.Stream.of( + subtopology.sourceTopics, + subtopology.repartitionSinkTopics, + subtopology.repartitionSourceTopics.iterator.asScala.map(_.name).toList.asJava, + subtopology.stateChangelogTopics.iterator.asScala.map(_.name).toList.asJava)) + .flatMap(_.stream) + .collect(Collectors.toSet[String]) + .asScala + + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + topicsToCheck)(identity) + + val updatedGroups = response.groups.stream.map { group => + val hasUnauthorizedTopic = group.topology.subtopologies.stream() + .flatMap(subtopology => util.stream.Stream.of( + subtopology.sourceTopics, + subtopology.repartitionSinkTopics, + subtopology.repartitionSourceTopics.iterator.asScala.map(_.name).toList.asJava, + subtopology.stateChangelogTopics.iterator.asScala.map(_.name).toList.asJava)) + .flatMap(_.stream) + .anyMatch(topic => !authorizedTopics.contains(topic)) + + if (hasUnauthorizedTopic) { + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(group.groupId) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setErrorMessage("The group has described topic(s) that the client is not authorized to describe.") Review Comment: "The described group uses topics that the client is not authorized to describe." ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -10350,10 +10400,32 @@ class KafkaApisTest extends Logging { ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + val subtotplogy0 = new StreamsGroupDescribeResponseData.Subtopology() Review Comment: nit: `subtotplogy` -> `subtopology`. Same in other places ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -10006,6 +10006,54 @@ class KafkaApisTest extends Logging { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + val groupId = "group" + val fooTopicName = "foo" + val barTopicName = "bar" + val zarTopicName = "zar" + + val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId(groupId).setTopology( + new StreamsGroupHeartbeatRequestData.Topology() + .setEpoch(3) + .setSubtopologies( + Collections.singletonList(new StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology") + .setSourceTopics(Collections.singletonList(fooTopicName)) + .setRepartitionSinkTopics(Collections.singletonList(barTopicName)) + .setRepartitionSourceTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName(zarTopicName))) Review Comment: We should include a changelog topic here as well. ########## clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java: ########## @@ -43,6 +43,7 @@ * - {@link Errors#STREAMS_INVALID_TOPOLOGY} * - {@link Errors#STREAMS_INVALID_TOPOLOGY_EPOCH} * - {@link Errors#STREAMS_TOPOLOGY_FENCED} + * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} Review Comment: duplicate ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2785,6 +2794,46 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + if (authorizer.isDefined) { + val topicsToCheck = response.groups.stream() + .flatMap(group => group.topology.subtopologies.stream) + .flatMap(subtopology => util.stream.Stream.of( + subtopology.sourceTopics, + subtopology.repartitionSinkTopics, + subtopology.repartitionSourceTopics.iterator.asScala.map(_.name).toList.asJava, Review Comment: Couldn't you use `.stream().map(_name)` instead of converting between scala and java? It seems the extra collection is unncessary here. Something like ``` Stream.concat( subtopology.sourceTopics.stream, subtopology.repartitionSinkTopics.stream, subtopology.repartitionSourceTopics.stream.map(_.name), .... ) ``` ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2785,6 +2794,46 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + if (authorizer.isDefined) { + val topicsToCheck = response.groups.stream() + .flatMap(group => group.topology.subtopologies.stream) + .flatMap(subtopology => util.stream.Stream.of( + subtopology.sourceTopics, + subtopology.repartitionSinkTopics, + subtopology.repartitionSourceTopics.iterator.asScala.map(_.name).toList.asJava, + subtopology.stateChangelogTopics.iterator.asScala.map(_.name).toList.asJava)) + .flatMap(_.stream) + .collect(Collectors.toSet[String]) + .asScala + + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + topicsToCheck)(identity) + + val updatedGroups = response.groups.stream.map { group => + val hasUnauthorizedTopic = group.topology.subtopologies.stream() + .flatMap(subtopology => util.stream.Stream.of( + subtopology.sourceTopics, Review Comment: Can we avoid creating intermediate collections? Either by using `Stream.concat`, or by just calling `anyMatch` on each of the collections and combining the boolean results using `||` for individual collections. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -10006,6 +10006,54 @@ class KafkaApisTest extends Logging { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + val groupId = "group" + val fooTopicName = "foo" + val barTopicName = "bar" + val zarTopicName = "zar" + + val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId(groupId).setTopology( + new StreamsGroupHeartbeatRequestData.Topology() + .setEpoch(3) + .setSubtopologies( + Collections.singletonList(new StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology") Review Comment: nit: `Collections.singletonList` -> `List.of`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org