dajac commented on code in PR #18989:
URL: https://github.com/apache/kafka/pull/18989#discussion_r1966467863


##########
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##########
@@ -30,6 +30,7 @@
   // - UNSUPPORTED_ASSIGNOR (version 0+)
   // - UNRELEASED_INSTANCE_ID (version 0+)
   // - GROUP_MAX_SIZE_REACHED (version 0+)
+  // - TOPIC_AUTHORIZATION_FAILED (version 0+)

Review Comment:
   Let's also add it to ConsumerGroupDescribeResponse.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2603,9 +2648,54 @@ public static Map<String, ResolvedRegularExpression> 
refreshRegularExpressions(
         return result;
     }
 
+    /**
+     * This method filters the topics in the resolved regexes
+     * that the member is authorized to describe.
+     *
+     * @param context           The request context.
+     * @param authorizer        The authorizer.
+     * @param resolvedRegexes   The map of the regex patter and its set of 
matched topics.
+     * @return The set of topics that the member is not authorized to describe.
+     */
+    private static Set<String> filterTopicDescribeAuthorizedTopics(
+        RequestContext context,
+        Optional<Authorizer> authorizer,
+        Map<String, Set<String>> resolvedRegexes
+    ) {
+        if (authorizer.isPresent()) {
+            Map<String, Integer> topicNameCount = new HashMap<>();
+            resolvedRegexes.values().forEach(topicNames ->
+                topicNames.forEach(topicName ->
+                    topicNameCount.compute(topicName, Utils::incValue)
+                )
+            );
+
+            List<Action> actions = 
topicNameCount.entrySet().stream().map(entry -> {
+                ResourcePattern resource = new ResourcePattern(TOPIC, 
entry.getKey(), LITERAL);
+                return new Action(DESCRIBE, resource, entry.getValue(), true, 
true);
+            }).collect(Collectors.toList());
+
+            List<AuthorizationResult> authorizationResults = 
authorizer.get().authorize(context, actions);
+            Set<String> deniedTopics = new HashSet<>();
+            IntStream.range(0, actions.size()).forEach(i -> {
+                if (authorizationResults.get(i) == AuthorizationResult.DENIED) 
{
+                    String deniedTopic = 
actions.get(i).resourcePattern().name();
+                    deniedTopics.add(deniedTopic);
+                }
+            });
+
+            resolvedRegexes.forEach((__, topicNames) -> 
topicNames.removeIf(deniedTopics::contains));
+
+            return deniedTopics;
+        }
+        return Collections.emptySet();
+    }
+
+
     /**
      * Handle the result of the asynchronous tasks which resolves the regular 
expressions.
      *
+     * @param groupId                       The group id.

Review Comment:
   Could we pass the member id and extend the log `"[GroupId {}] Received 
updated regular expressions: {}.` to include it? This is useful to know that 
they were resolved based on the context of the member X.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16603,6 +16606,141 @@ foooTopicName, new TopicMetadata(foooTopicId, 
foooTopicName, 1)
         );
     }
 
+    @Test
+    public void 
testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        Authorizer authorizer = mock(Authorizer.class);
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build(12345L))
+            .withAuthorizer(authorizer)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(List.of("foo"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withResolvedRegularExpression("foo*", new 
ResolvedRegularExpression(
+                    Set.of(fooTopicName), 0L, 0L))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
+        context.time.sleep(10001L);
+
+        Map<String, AuthorizationResult> acls = Map.of(
+            fooTopicName, AuthorizationResult.ALLOWED,
+            barTopicName, AuthorizationResult.DENIED
+        );
+        when(authorizer.authorize(any(), any())).thenAnswer(invocation -> {
+            List<Action> actions = invocation.getArgument(1, List.class);
+            return actions.stream()
+                .map(action -> 
acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED))
+                .collect(Collectors.toList());
+        });
+
+        // Member 2 joins with a different regular expression.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex("bar*")

Review Comment:
   Should we use a regex which matches both foo and bar to ensure that the 
filtering works with partial result?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2587,6 +2621,17 @@ public static Map<String, ResolvedRegularExpression> 
refreshRegularExpressions(
             }
         }
 
+        Set<String> deniedTopics = filterTopicDescribeAuthorizedTopics(
+            context,
+            authorizer,
+            resolvedRegexes
+        );
+
+        if (log.isDebugEnabled()) {
+            log.debug("[GroupId {}] Member {} is not authorized to describe 
topics: {}.",
+                groupId, memberId, deniedTopics);
+        }

Review Comment:
   This log won't be very useful like this. I would remove it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2603,9 +2648,54 @@ public static Map<String, ResolvedRegularExpression> 
refreshRegularExpressions(
         return result;
     }
 
+    /**
+     * This method filters the topics in the resolved regexes
+     * that the member is authorized to describe.
+     *
+     * @param context           The request context.
+     * @param authorizer        The authorizer.
+     * @param resolvedRegexes   The map of the regex patter and its set of 
matched topics.
+     * @return The set of topics that the member is not authorized to describe.
+     */
+    private static Set<String> filterTopicDescribeAuthorizedTopics(
+        RequestContext context,
+        Optional<Authorizer> authorizer,
+        Map<String, Set<String>> resolvedRegexes
+    ) {
+        if (authorizer.isPresent()) {

Review Comment:
   nit: We could invert the condition and return early.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16603,6 +16606,141 @@ foooTopicName, new TopicMetadata(foooTopicId, 
foooTopicName, 1)
         );
     }
 
+    @Test
+    public void 
testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        Authorizer authorizer = mock(Authorizer.class);
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build(12345L))
+            .withAuthorizer(authorizer)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(List.of("foo"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withResolvedRegularExpression("foo*", new 
ResolvedRegularExpression(
+                    Set.of(fooTopicName), 0L, 0L))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
+        context.time.sleep(10001L);
+
+        Map<String, AuthorizationResult> acls = Map.of(
+            fooTopicName, AuthorizationResult.ALLOWED,
+            barTopicName, AuthorizationResult.DENIED
+        );
+        when(authorizer.authorize(any(), any())).thenAnswer(invocation -> {
+            List<Action> actions = invocation.getArgument(1, List.class);
+            return actions.stream()
+                .map(action -> 
acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED))
+                .collect(Collectors.toList());
+        });
+
+        // Member 2 joins with a different regular expression.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex("bar*")
+                .setServerAssignor("range")
+                .setTopicPartitions(Collections.emptyList()),
+            ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
+        );
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(Collections.singletonList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(3, 4, 5))))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicRegex("bar*")
+            .setServerAssignorName("range")
+            .build();
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*")
+        );
+        assertRecordsEquals(expectedRecords, result.records());
+
+        // Execute pending tasks.
+        List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> tasks 
= context.processTasks();
+        assertEquals(
+            List.of(
+                new MockCoordinatorExecutor.ExecutorResult<>(
+                    groupId + "-regex",
+                    new CoordinatorResult<>(Collections.singletonList(

Review Comment:
   nit: List.of



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2603,9 +2648,54 @@ public static Map<String, ResolvedRegularExpression> 
refreshRegularExpressions(
         return result;
     }
 
+    /**
+     * This method filters the topics in the resolved regexes
+     * that the member is authorized to describe.
+     *
+     * @param context           The request context.
+     * @param authorizer        The authorizer.
+     * @param resolvedRegexes   The map of the regex patter and its set of 
matched topics.
+     * @return The set of topics that the member is not authorized to describe.
+     */
+    private static Set<String> filterTopicDescribeAuthorizedTopics(
+        RequestContext context,
+        Optional<Authorizer> authorizer,
+        Map<String, Set<String>> resolvedRegexes
+    ) {
+        if (authorizer.isPresent()) {
+            Map<String, Integer> topicNameCount = new HashMap<>();
+            resolvedRegexes.values().forEach(topicNames ->
+                topicNames.forEach(topicName ->
+                    topicNameCount.compute(topicName, Utils::incValue)
+                )
+            );
+
+            List<Action> actions = 
topicNameCount.entrySet().stream().map(entry -> {
+                ResourcePattern resource = new ResourcePattern(TOPIC, 
entry.getKey(), LITERAL);
+                return new Action(DESCRIBE, resource, entry.getValue(), true, 
true);
+            }).collect(Collectors.toList());
+
+            List<AuthorizationResult> authorizationResults = 
authorizer.get().authorize(context, actions);
+            Set<String> deniedTopics = new HashSet<>();
+            IntStream.range(0, actions.size()).forEach(i -> {
+                if (authorizationResults.get(i) == AuthorizationResult.DENIED) 
{
+                    String deniedTopic = 
actions.get(i).resourcePattern().name();
+                    deniedTopics.add(deniedTopic);
+                }
+            });
+
+            resolvedRegexes.forEach((__, topicNames) -> 
topicNames.removeIf(deniedTopics::contains));

Review Comment:
   nit: You may be able to use removeAll.



##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -522,6 +524,13 @@ public CoordinatorShardBuilder<MockCoordinatorShard, 
String> withTopicPartition(
             return this;
         }
 
+        @Override
+        public CoordinatorShardBuilder<MockCoordinatorShard, String> 
withAuthorizer(
+            Optional<Authorizer> authorizer
+        ) {
+            return this;
+        }
+

Review Comment:
   Can we remove this one now?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2592,6 +2607,28 @@ class KafkaApis(val requestChannel: RequestChannel,
             response.groups.addAll(results)
           }
 
+          // Clients are not allowed to see topics that are not authorized for 
Describe.
+          val updatedGroups = response.groups.asScala.map { group => {
+            val topicsToCheck = group.members.asScala.flatMap(member =>
+              List(member.assignment, 
member.targetAssignment).flatMap(_.topicPartitions.asScala.map(_.topicName)))
+
+            val authorizedTopics = 
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,

Review Comment:
   I let a comment in our previous thread about this one.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2592,6 +2607,32 @@ class KafkaApis(val requestChannel: RequestChannel,
             response.groups.addAll(results)
           }
 
+          // Clients are not allowed to see topics that are not authorized for 
Describe.
+          var topicsToCheck = Set[String]()
+          response.groups.forEach(_.members.forEach { member =>
+            List(member.assignment, member.targetAssignment).foreach { 
assignment =>
+              assignment.topicPartitions.asScala.foreach { tp =>
+                topicsToCheck += tp.topicName
+              }
+            }
+          })
+          val authorizedTopics = 
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,

Review Comment:
   @dongnuo123 I am sorry. You were right. I think that I misunderstood the 
code last night. I should not work in the evening... Aggregating topic names 
makes sense to reduce the number of authorize calls because we authorize based 
on the identify of the caller not based on the group. Then we check each 
individual group based on the authorized topics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to