dajac commented on code in PR #18989:
URL: https://github.com/apache/kafka/pull/18989#discussion_r1965113944


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2519,19 +2519,28 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleConsumerGroupHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val consumerGroupHeartbeatRequest = 
request.body[ConsumerGroupHeartbeatRequest]
+    var future = CompletableFuture.completedFuture[Unit](())
 
     if (!isConsumerGroupProtocolEnabled()) {
       // The API is not supported by the "old" group coordinator (the 
default). If the
       // new one is not enabled, we fail directly here.
       requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-      CompletableFuture.completedFuture[Unit](())
     } else if (!authHelper.authorize(request.context, READ, GROUP, 
consumerGroupHeartbeatRequest.data.groupId)) {
       requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
-      CompletableFuture.completedFuture[Unit](())
+    } else if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null 
&&
+      !consumerGroupHeartbeatRequest.data.subscribedTopicNames.isEmpty) {
+      // Check the authorization if the subscribed topic names are provided.
+      // Clients are not allowed to see topics that are not authorized for 
Describe.
+      val authorizedTopics = authHelper.filterByAuthorized(request.context, 
DESCRIBE, TOPIC,
+        
consumerGroupHeartbeatRequest.data.subscribedTopicNames.asScala)(identity)
+      if (authorizedTopics.size < 
consumerGroupHeartbeatRequest.data.subscribedTopicNames.size) {
+        requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED.exception))

Review Comment:
   I wonder whether we should put a custom error message. Have you considered 
it?



##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -70,7 +71,8 @@ private[group] class GroupCoordinatorAdapter(
 
   override def consumerGroupHeartbeat(
     context: RequestContext,
-    request: ConsumerGroupHeartbeatRequestData
+    request: ConsumerGroupHeartbeatRequestData,
+    authorizer: Optional[Authorizer]

Review Comment:
   This looks wrong to me. I would rather prefer to pass the `Authorizer` when 
the `GroupCoordinatorService` is constructed. Then, we can pass it to the 
`GroupMetadataManager`.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2519,19 +2519,28 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleConsumerGroupHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val consumerGroupHeartbeatRequest = 
request.body[ConsumerGroupHeartbeatRequest]
+    var future = CompletableFuture.completedFuture[Unit](())

Review Comment:
   nit: I would prefer keeping the previous way in order to avoid this mutable 
variable.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2519,19 +2519,28 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleConsumerGroupHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val consumerGroupHeartbeatRequest = 
request.body[ConsumerGroupHeartbeatRequest]
+    var future = CompletableFuture.completedFuture[Unit](())
 
     if (!isConsumerGroupProtocolEnabled()) {
       // The API is not supported by the "old" group coordinator (the 
default). If the
       // new one is not enabled, we fail directly here.
       requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-      CompletableFuture.completedFuture[Unit](())
     } else if (!authHelper.authorize(request.context, READ, GROUP, 
consumerGroupHeartbeatRequest.data.groupId)) {
       requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
-      CompletableFuture.completedFuture[Unit](())
+    } else if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null 
&&
+      !consumerGroupHeartbeatRequest.data.subscribedTopicNames.isEmpty) {
+      // Check the authorization if the subscribed topic names are provided.
+      // Clients are not allowed to see topics that are not authorized for 
Describe.
+      val authorizedTopics = authHelper.filterByAuthorized(request.context, 
DESCRIBE, TOPIC,
+        
consumerGroupHeartbeatRequest.data.subscribedTopicNames.asScala)(identity)
+      if (authorizedTopics.size < 
consumerGroupHeartbeatRequest.data.subscribedTopicNames.size) {
+        requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED.exception))
+      }

Review Comment:
   If we go in this branch, how is the group coordinator called if all the 
subscribed topic names are OK?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2592,6 +2602,25 @@ class KafkaApis(val requestChannel: RequestChannel,
             response.groups.addAll(results)
           }
 
+          // Clients are not allowed to see topics that are not authorized for 
Describe.

Review Comment:
   In my opinion, we could also return `TOPIC_AUTHORIZATION_FAILED` for this 
API but without mentioning the topics which are disallowed.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2529,11 +2547,56 @@ private boolean maybeUpdateRegularExpressions(
                 () -> refreshRegularExpressions(groupId, log, time, 
metadataImage, regexes),
                 (result, exception) -> handleRegularExpressionsResult(groupId, 
result, exception)
             );
+        } else if (isNotEmpty(newSubscribedTopicRegex)) {
+            throwIfTopicDescribeAuthorizationDenied(
+                context,
+                updatedMember.memberId(),
+                authorizer,
+                
group.resolvedRegularExpression(newSubscribedTopicRegex).get().topics
+            );

Review Comment:
   This does not work. Let's discuss it offline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to