dajac commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1966467863
########## clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json: ########## @@ -30,6 +30,7 @@ // - UNSUPPORTED_ASSIGNOR (version 0+) // - UNRELEASED_INSTANCE_ID (version 0+) // - GROUP_MAX_SIZE_REACHED (version 0+) + // - TOPIC_AUTHORIZATION_FAILED (version 0+) Review Comment: Let's also add it to ConsumerGroupDescribeResponse. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2603,9 +2648,54 @@ public static Map<String, ResolvedRegularExpression> refreshRegularExpressions( return result; } + /** + * This method filters the topics in the resolved regexes + * that the member is authorized to describe. + * + * @param context The request context. + * @param authorizer The authorizer. + * @param resolvedRegexes The map of the regex patter and its set of matched topics. + * @return The set of topics that the member is not authorized to describe. + */ + private static Set<String> filterTopicDescribeAuthorizedTopics( + RequestContext context, + Optional<Authorizer> authorizer, + Map<String, Set<String>> resolvedRegexes + ) { + if (authorizer.isPresent()) { + Map<String, Integer> topicNameCount = new HashMap<>(); + resolvedRegexes.values().forEach(topicNames -> + topicNames.forEach(topicName -> + topicNameCount.compute(topicName, Utils::incValue) + ) + ); + + List<Action> actions = topicNameCount.entrySet().stream().map(entry -> { + ResourcePattern resource = new ResourcePattern(TOPIC, entry.getKey(), LITERAL); + return new Action(DESCRIBE, resource, entry.getValue(), true, true); + }).collect(Collectors.toList()); + + List<AuthorizationResult> authorizationResults = authorizer.get().authorize(context, actions); + Set<String> deniedTopics = new HashSet<>(); + IntStream.range(0, actions.size()).forEach(i -> { + if (authorizationResults.get(i) == AuthorizationResult.DENIED) { + String deniedTopic = actions.get(i).resourcePattern().name(); + deniedTopics.add(deniedTopic); + } + }); + + resolvedRegexes.forEach((__, topicNames) -> topicNames.removeIf(deniedTopics::contains)); + + return deniedTopics; + } + return Collections.emptySet(); + } + + /** * Handle the result of the asynchronous tasks which resolves the regular expressions. * + * @param groupId The group id. Review Comment: Could we pass the member id and extend the log `"[GroupId {}] Received updated regular expressions: {}.` to include it? This is useful to know that they were resolved based on the context of the member X. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16603,6 +16606,141 @@ foooTopicName, new TopicMetadata(foooTopicId, foooTopicName, 1) ); } + @Test + public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + Authorizer authorizer = mock(Authorizer.class); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(12345L)) + .withAuthorizer(authorizer) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5))) + .withResolvedRegularExpression("foo*", new ResolvedRegularExpression( + Set.of(fooTopicName), 0L, 0L)) + .withAssignmentEpoch(10)) + .build(); + + // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS + context.time.sleep(10001L); + + Map<String, AuthorizationResult> acls = Map.of( + fooTopicName, AuthorizationResult.ALLOWED, + barTopicName, AuthorizationResult.DENIED + ); + when(authorizer.authorize(any(), any())).thenAnswer(invocation -> { + List<Action> actions = invocation.getArgument(1, List.class); + return actions.stream() + .map(action -> acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED)) + .collect(Collectors.toList()); + }); + + // Member 2 joins with a different regular expression. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("bar*") Review Comment: Should we use a regex which matches both foo and bar to ensure that the filtering works with partial result? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2587,6 +2621,17 @@ public static Map<String, ResolvedRegularExpression> refreshRegularExpressions( } } + Set<String> deniedTopics = filterTopicDescribeAuthorizedTopics( + context, + authorizer, + resolvedRegexes + ); + + if (log.isDebugEnabled()) { + log.debug("[GroupId {}] Member {} is not authorized to describe topics: {}.", + groupId, memberId, deniedTopics); + } Review Comment: This log won't be very useful like this. I would remove it. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2603,9 +2648,54 @@ public static Map<String, ResolvedRegularExpression> refreshRegularExpressions( return result; } + /** + * This method filters the topics in the resolved regexes + * that the member is authorized to describe. + * + * @param context The request context. + * @param authorizer The authorizer. + * @param resolvedRegexes The map of the regex patter and its set of matched topics. + * @return The set of topics that the member is not authorized to describe. + */ + private static Set<String> filterTopicDescribeAuthorizedTopics( + RequestContext context, + Optional<Authorizer> authorizer, + Map<String, Set<String>> resolvedRegexes + ) { + if (authorizer.isPresent()) { Review Comment: nit: We could invert the condition and return early. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16603,6 +16606,141 @@ foooTopicName, new TopicMetadata(foooTopicId, foooTopicName, 1) ); } + @Test + public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + Authorizer authorizer = mock(Authorizer.class); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(12345L)) + .withAuthorizer(authorizer) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5))) + .withResolvedRegularExpression("foo*", new ResolvedRegularExpression( + Set.of(fooTopicName), 0L, 0L)) + .withAssignmentEpoch(10)) + .build(); + + // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS + context.time.sleep(10001L); + + Map<String, AuthorizationResult> acls = Map.of( + fooTopicName, AuthorizationResult.ALLOWED, + barTopicName, AuthorizationResult.DENIED + ); + when(authorizer.authorize(any(), any())).thenAnswer(invocation -> { + List<Action> actions = invocation.getArgument(1, List.class); + return actions.stream() + .map(action -> acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED)) + .collect(Collectors.toList()); + }); + + // Member 2 joins with a different regular expression. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("bar*") + .setServerAssignor("range") + .setTopicPartitions(Collections.emptyList()), + ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion() + ); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(Collections.singletonList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(3, 4, 5))))), + result.response() + ); + + ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("bar*") + .setServerAssignorName("range") + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*") + ); + assertRecordsEquals(expectedRecords, result.records()); + + // Execute pending tasks. + List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> tasks = context.processTasks(); + assertEquals( + List.of( + new MockCoordinatorExecutor.ExecutorResult<>( + groupId + "-regex", + new CoordinatorResult<>(Collections.singletonList( Review Comment: nit: List.of ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2603,9 +2648,54 @@ public static Map<String, ResolvedRegularExpression> refreshRegularExpressions( return result; } + /** + * This method filters the topics in the resolved regexes + * that the member is authorized to describe. + * + * @param context The request context. + * @param authorizer The authorizer. + * @param resolvedRegexes The map of the regex patter and its set of matched topics. + * @return The set of topics that the member is not authorized to describe. + */ + private static Set<String> filterTopicDescribeAuthorizedTopics( + RequestContext context, + Optional<Authorizer> authorizer, + Map<String, Set<String>> resolvedRegexes + ) { + if (authorizer.isPresent()) { + Map<String, Integer> topicNameCount = new HashMap<>(); + resolvedRegexes.values().forEach(topicNames -> + topicNames.forEach(topicName -> + topicNameCount.compute(topicName, Utils::incValue) + ) + ); + + List<Action> actions = topicNameCount.entrySet().stream().map(entry -> { + ResourcePattern resource = new ResourcePattern(TOPIC, entry.getKey(), LITERAL); + return new Action(DESCRIBE, resource, entry.getValue(), true, true); + }).collect(Collectors.toList()); + + List<AuthorizationResult> authorizationResults = authorizer.get().authorize(context, actions); + Set<String> deniedTopics = new HashSet<>(); + IntStream.range(0, actions.size()).forEach(i -> { + if (authorizationResults.get(i) == AuthorizationResult.DENIED) { + String deniedTopic = actions.get(i).resourcePattern().name(); + deniedTopics.add(deniedTopic); + } + }); + + resolvedRegexes.forEach((__, topicNames) -> topicNames.removeIf(deniedTopics::contains)); Review Comment: nit: You may be able to use removeAll. ########## coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java: ########## @@ -522,6 +524,13 @@ public CoordinatorShardBuilder<MockCoordinatorShard, String> withTopicPartition( return this; } + @Override + public CoordinatorShardBuilder<MockCoordinatorShard, String> withAuthorizer( + Optional<Authorizer> authorizer + ) { + return this; + } + Review Comment: Can we remove this one now? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2592,6 +2607,28 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + val updatedGroups = response.groups.asScala.map { group => { + val topicsToCheck = group.members.asScala.flatMap(member => + List(member.assignment, member.targetAssignment).flatMap(_.topicPartitions.asScala.map(_.topicName))) + + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, Review Comment: I let a comment in our previous thread about this one. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2592,6 +2607,32 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + var topicsToCheck = Set[String]() + response.groups.forEach(_.members.forEach { member => + List(member.assignment, member.targetAssignment).foreach { assignment => + assignment.topicPartitions.asScala.foreach { tp => + topicsToCheck += tp.topicName + } + } + }) + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, Review Comment: @dongnuo123 I am sorry. You were right. I think that I misunderstood the code last night. I should not work in the evening... Aggregating topic names makes sense to reduce the number of authorize calls because we authorize based on the identify of the caller not based on the group. Then we check each individual group based on the authorized topics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org