lianetm commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1970551665
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16603,6 +16606,232 @@ foooTopicName, new TopicMetadata(foooTopicId, foooTopicName, 1) ); } + @Test + public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + Authorizer authorizer = mock(Authorizer.class); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(12345L)) + .withAuthorizer(authorizer) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5))) + .build()) + .withSubscriptionMetadata(Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6))) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5))) + .withResolvedRegularExpression("foo*", new ResolvedRegularExpression( + Set.of(fooTopicName), 0L, 0L)) + .withAssignmentEpoch(10)) + .build(); + + // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS + context.time.sleep(10001L); + + Map<String, AuthorizationResult> acls = new HashMap<>(); + acls.put(fooTopicName, AuthorizationResult.ALLOWED); + acls.put(barTopicName, AuthorizationResult.DENIED); + when(authorizer.authorize(any(), any())).thenAnswer(invocation -> { + List<Action> actions = invocation.getArgument(1, List.class); + return actions.stream() + .map(action -> acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED)) + .collect(Collectors.toList()); + }); + + // Member 2 heartbeats with a different regular expression. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result1 = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") Review Comment: just for the record, this makes me notice that we require a HB with regex in this case (and makes sense, because IIRC we only refresh it if it changes or if there is a new topic metadata image, which is not the case here). I remember there was already a jira/intention to consider time-based refresh, which I expect would help improve the experience in this case. I will find the jira (or file) and update here just for reference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org