dajac commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1965113944
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2519,19 +2519,28 @@ class KafkaApis(val requestChannel: RequestChannel, def handleConsumerGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { val consumerGroupHeartbeatRequest = request.body[ConsumerGroupHeartbeatRequest] + var future = CompletableFuture.completedFuture[Unit](()) if (!isConsumerGroupProtocolEnabled()) { // The API is not supported by the "old" group coordinator (the default). If the // new one is not enabled, we fail directly here. requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) } else if (!authHelper.authorize(request.context, READ, GROUP, consumerGroupHeartbeatRequest.data.groupId)) { requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) - CompletableFuture.completedFuture[Unit](()) + } else if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null && + !consumerGroupHeartbeatRequest.data.subscribedTopicNames.isEmpty) { + // Check the authorization if the subscribed topic names are provided. + // Clients are not allowed to see topics that are not authorized for Describe. + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + consumerGroupHeartbeatRequest.data.subscribedTopicNames.asScala)(identity) + if (authorizedTopics.size < consumerGroupHeartbeatRequest.data.subscribedTopicNames.size) { + requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED.exception)) Review Comment: I wonder whether we should put a custom error message. Have you considered it? ########## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ########## @@ -70,7 +71,8 @@ private[group] class GroupCoordinatorAdapter( override def consumerGroupHeartbeat( context: RequestContext, - request: ConsumerGroupHeartbeatRequestData + request: ConsumerGroupHeartbeatRequestData, + authorizer: Optional[Authorizer] Review Comment: This looks wrong to me. I would rather prefer to pass the `Authorizer` when the `GroupCoordinatorService` is constructed. Then, we can pass it to the `GroupMetadataManager`. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2519,19 +2519,28 @@ class KafkaApis(val requestChannel: RequestChannel, def handleConsumerGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { val consumerGroupHeartbeatRequest = request.body[ConsumerGroupHeartbeatRequest] + var future = CompletableFuture.completedFuture[Unit](()) Review Comment: nit: I would prefer keeping the previous way in order to avoid this mutable variable. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2519,19 +2519,28 @@ class KafkaApis(val requestChannel: RequestChannel, def handleConsumerGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { val consumerGroupHeartbeatRequest = request.body[ConsumerGroupHeartbeatRequest] + var future = CompletableFuture.completedFuture[Unit](()) if (!isConsumerGroupProtocolEnabled()) { // The API is not supported by the "old" group coordinator (the default). If the // new one is not enabled, we fail directly here. requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) } else if (!authHelper.authorize(request.context, READ, GROUP, consumerGroupHeartbeatRequest.data.groupId)) { requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) - CompletableFuture.completedFuture[Unit](()) + } else if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null && + !consumerGroupHeartbeatRequest.data.subscribedTopicNames.isEmpty) { + // Check the authorization if the subscribed topic names are provided. + // Clients are not allowed to see topics that are not authorized for Describe. + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + consumerGroupHeartbeatRequest.data.subscribedTopicNames.asScala)(identity) + if (authorizedTopics.size < consumerGroupHeartbeatRequest.data.subscribedTopicNames.size) { + requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED.exception)) + } Review Comment: If we go in this branch, how is the group coordinator called if all the subscribed topic names are OK? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2592,6 +2602,25 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. Review Comment: In my opinion, we could also return `TOPIC_AUTHORIZATION_FAILED` for this API but without mentioning the topics which are disallowed. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2529,11 +2547,56 @@ private boolean maybeUpdateRegularExpressions( () -> refreshRegularExpressions(groupId, log, time, metadataImage, regexes), (result, exception) -> handleRegularExpressionsResult(groupId, result, exception) ); + } else if (isNotEmpty(newSubscribedTopicRegex)) { + throwIfTopicDescribeAuthorizationDenied( + context, + updatedMember.memberId(), + authorizer, + group.resolvedRegularExpression(newSubscribedTopicRegex).get().topics + ); Review Comment: This does not work. Let's discuss it offline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org