lianetm commented on code in PR #18989:
URL: https://github.com/apache/kafka/pull/18989#discussion_r1970551665


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16603,6 +16606,232 @@ foooTopicName, new TopicMetadata(foooTopicId, 
foooTopicName, 1)
         );
     }
 
+    @Test
+    public void 
testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        Authorizer authorizer = mock(Authorizer.class);
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build(12345L))
+            .withAuthorizer(authorizer)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(List.of("foo"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                    .build())
+                .withSubscriptionMetadata(Map.of(
+                    fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6)))
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withResolvedRegularExpression("foo*", new 
ResolvedRegularExpression(
+                    Set.of(fooTopicName), 0L, 0L))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
+        context.time.sleep(10001L);
+
+        Map<String, AuthorizationResult> acls = new HashMap<>();
+        acls.put(fooTopicName, AuthorizationResult.ALLOWED);
+        acls.put(barTopicName, AuthorizationResult.DENIED);
+        when(authorizer.authorize(any(), any())).thenAnswer(invocation -> {
+            List<Action> actions = invocation.getArgument(1, List.class);
+            return actions.stream()
+                .map(action -> 
acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED))
+                .collect(Collectors.toList());
+        });
+
+        // Member 2 heartbeats with a different regular expression.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex("foo*|bar*")

Review Comment:
   just for the record, this makes me notice that we require a HB with regex in 
this case (and makes sense, because IIRC we only refresh it if it changes or if 
there is a new topic metadata image, which is not the case here). 
   
   I remember there was already a jira/intention to consider time-based 
refresh, which I expect would help improve the experience in this case. I will 
find the jira (or file) and update here just for reference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to