[ 
https://issues.apache.org/jira/browse/KAFKA-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613082#comment-16613082
 ] 

ASF GitHub Bot commented on KAFKA-7394:
---------------------------------------

hachikuji closed pull request #5634: KAFKA-7394; OffsetsForLeaderEpoch supports 
topic describe access
URL: https://github.com/apache/kafka/pull/5634
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index 839579bccb2..d729dadcb48 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -194,15 +194,16 @@ class Replica(val brokerId: Int,
 
   override def toString: String = {
     val replicaString = new StringBuilder
-    replicaString.append("ReplicaId: " + brokerId)
-    replicaString.append("; Topic: " + topicPartition.topic)
-    replicaString.append("; Partition: " + topicPartition.partition)
-    replicaString.append("; isLocal: " + isLocal)
-    replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs)
+    replicaString.append("Replica(replicaId=" + brokerId)
+    replicaString.append(s", topic=${topicPartition.topic}")
+    replicaString.append(s", partition=${topicPartition.partition}")
+    replicaString.append(s", isLocal=$isLocal")
+    replicaString.append(s", lastCaughtUpTimeMs=$lastCaughtUpTimeMs")
     if (isLocal) {
-      replicaString.append("; Highwatermark: " + highWatermark)
-      replicaString.append("; LastStableOffset: " + lastStableOffset)
+      replicaString.append(s", highWatermark=$highWatermark")
+      replicaString.append(s", lastStableOffset=$lastStableOffset")
     }
+    replicaString.append(")")
     replicaString.toString
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5a82cc4530a..afbe5b88204 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -181,7 +181,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+    if (isAuthorizedClusterAction(request)) {
       val response = replicaManager.becomeLeaderOrFollower(correlationId, 
leaderAndIsrRequest, onLeadershipChange)
       sendResponseExemptThrottle(request, response)
     } else {
@@ -196,7 +196,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // stop serving data to clients for the topic being deleted
     val stopReplicaRequest = request.body[StopReplicaRequest]
 
-    if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+    if (isAuthorizedClusterAction(request)) {
       val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
       // Clearing out the cache for groups that belong to an offsets topic 
partition for which this broker was the leader,
       // since this broker is no longer a replica for that offsets topic 
partition.
@@ -223,7 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val correlationId = request.header.correlationId
     val updateMetadataRequest = request.body[UpdateMetadataRequest]
 
-    if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+    if (isAuthorizedClusterAction(request)) {
       val deletedPartitions = 
replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
       if (deletedPartitions.nonEmpty)
         groupCoordinator.handleDeletedPartitions(deletedPartitions)
@@ -2038,12 +2038,27 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit 
= {
     val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
-    val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition()
-    authorizeClusterAction(request)
+    val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition.asScala
+
+    // The OffsetsForLeaderEpoch API was initially only used for inter-broker 
communication and required
+    // cluster permission. With KIP-320, the consumer now also uses this API 
to check for log truncation
+    // following a leader change, so we also allow topic describe permission.
+    val (authorizedPartitions, unauthorizedPartitions) = if 
(isAuthorizedClusterAction(request)) {
+      (requestInfo, Map.empty[TopicPartition, 
OffsetsForLeaderEpochRequest.PartitionData])
+    } else {
+      requestInfo.partition {
+        case (tp, _) => authorize(request.session, Describe, Resource(Topic, 
tp.topic, LITERAL))
+      }
+    }
 
-    val lastOffsetForLeaderEpoch = 
replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava
+    val endOffsetsForAuthorizedPartitions = 
replicaManager.lastOffsetForLeaderEpoch(authorizedPartitions)
+    val endOffsetsForUnauthorizedPartitions = 
unauthorizedPartitions.mapValues(_ =>
+      new EpochEndOffset(Errors.TOPIC_AUTHORIZATION_FAILED, 
EpochEndOffset.UNDEFINED_EPOCH,
+        EpochEndOffset.UNDEFINED_EPOCH_OFFSET))
+
+    val endOffsetsForAllPartitions = endOffsetsForAuthorizedPartitions ++ 
endOffsetsForUnauthorizedPartitions
     sendResponseMaybeThrottle(request, requestThrottleMs =>
-      new OffsetsForLeaderEpochResponse(requestThrottleMs, 
lastOffsetForLeaderEpoch))
+      new OffsetsForLeaderEpochResponse(requestThrottleMs, 
endOffsetsForAllPartitions.asJava))
   }
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
@@ -2247,10 +2262,14 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
-    if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
+    if (!isAuthorizedClusterAction(request))
       throw new ClusterAuthorizationException(s"Request $request is not 
authorized.")
   }
 
+  private def isAuthorizedClusterAction(request: RequestChannel.Request): 
Boolean = {
+    authorize(request.session, ClusterAction, Resource.ClusterResource)
+  }
+
   def authorizeClusterAlter(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, Alter, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not 
authorized.")
@@ -2283,7 +2302,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   private def handleError(request: RequestChannel.Request, e: Throwable) {
     val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || 
!request.header.apiKey.clusterAction
-    error("Error when handling request 
%s".format(request.body[AbstractRequest]), e)
+    error("Error when handling request: " +
+      s"clientId=${request.header.clientId}, " +
+      s"correlationId=${request.header.correlationId}, " +
+      s"api=${request.header.apiKey}, " +
+      s"body=${request.body[AbstractRequest]}", e)
     if (mayThrottle)
       sendErrorResponseMaybeThrottle(request, e)
     else
@@ -2361,4 +2384,5 @@ class KafkaApis(val requestChannel: RequestChannel,
   private def sendResponse(response: RequestChannel.Response): Unit = {
     requestChannel.sendResponse(response)
   }
+
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d61240e3af6..114e69c4275 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1494,4 +1494,3 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 }
-
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index cfc6b5e43d7..dbe2a02e811 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -211,7 +211,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.CREATE_TOPICS -> topicCreateAcl,
     ApiKeys.DELETE_TOPICS -> topicDeleteAcl,
     ApiKeys.DELETE_RECORDS -> topicDeleteAcl,
-    ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl,
+    ApiKeys.OFFSET_FOR_LEADER_EPOCH -> topicDescribeAcl,
     ApiKeys.DESCRIBE_CONFIGS -> topicDescribeConfigsAcl,
     ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl,
     ApiKeys.INIT_PRODUCER_ID -> (transactionIdWriteAcl ++ 
clusterIdempotentWriteAcl),
@@ -277,7 +277,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       build()
   }
 
-  private def offsetsForLeaderEpochRequest = {
+  private def offsetsForLeaderEpochRequest: OffsetsForLeaderEpochRequest = {
     new 
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion)
       .add(tp, Optional.of(27), 7).build()
   }
@@ -382,7 +382,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def addOffsetsToTxnRequest = new 
AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build()
 
-
   @Test
   def testAuthorizationWithTopicExisting() {
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
@@ -400,7 +399,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.HEARTBEAT -> heartbeatRequest,
       ApiKeys.LEAVE_GROUP -> leaveGroupRequest,
       ApiKeys.LEADER_AND_ISR -> leaderAndIsrRequest,
-      ApiKeys.STOP_REPLICA -> stopReplicaRequest,
       ApiKeys.CONTROLLED_SHUTDOWN -> controlledShutdownRequest,
       ApiKeys.CREATE_TOPICS -> createTopicsRequest,
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
@@ -415,7 +413,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest,
       ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
       ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
-      ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest
+      ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
+
+      // Check StopReplica last since some APIs depend on replica availability
+      ApiKeys.STOP_REPLICA -> stopReplicaRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
@@ -426,7 +427,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       val resourceToAcls = requestKeysToAcls(key)
       resourceToAcls.get(topicResource).foreach { acls =>
         val describeAcls = topicDescribeAcl(topicResource)
-        val isAuthorized =  describeAcls == acls
+        val isAuthorized = describeAcls == acls
         addAndVerifyAcls(describeAcls, topicResource)
         sendRequestAndVerifyResponseError(key, request, resources, 
isAuthorized = isAuthorized)
         removeAllAcls()
@@ -460,7 +461,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
       ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
       ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
-      ApiKeys.DELETE_GROUPS -> deleteGroupsRequest
+      ApiKeys.DELETE_GROUPS -> deleteGroupsRequest,
+      ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
@@ -504,11 +506,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val resources = Set(topicResource.resourceType, 
Resource.ClusterResource.resourceType)
     sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = 
false)
 
-    val readAcls = topicReadAcl.get(topicResource).get
+    val readAcls = topicReadAcl(topicResource)
     addAndVerifyAcls(readAcls, topicResource)
     sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = 
false)
 
-    val clusterAcls = clusterAcl.get(Resource.ClusterResource).get
+    val clusterAcls = clusterAcl(Resource.ClusterResource)
+    addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
+    sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = 
true)
+  }
+
+  @Test
+  def testOffsetsForLeaderEpochClusterPermission(): Unit = {
+    val key = ApiKeys.OFFSET_FOR_LEADER_EPOCH
+    val request = offsetsForLeaderEpochRequest
+
+    removeAllAcls()
+
+    val resources = Set(topicResource.resourceType, 
Resource.ClusterResource.resourceType)
+    sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = 
false)
+
+    // Although the OffsetsForLeaderEpoch API now accepts topic describe, we 
should continue
+    // allowing cluster action for backwards compatibility
+    val clusterAcls = clusterAcl(Resource.ClusterResource)
     addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
     sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = 
true)
   }
@@ -1010,18 +1029,31 @@ class AuthorizerIntegrationTest extends BaseRequestTest 
{
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
-  def testListOffsetsWithNoTopicAccess() {
+  def testMetadataWithNoTopicAccess() {
     val consumer = createConsumer()
     consumer.partitionsFor(topic)
   }
 
   @Test
-  def testListOffsetsWithTopicDescribe() {
+  def testMetadataWithTopicDescribe() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
     val consumer = createConsumer()
     consumer.partitionsFor(topic)
   }
 
+  @Test(expected = classOf[TopicAuthorizationException])
+  def testListOffsetsWithNoTopicAccess() {
+    val consumer = createConsumer()
+    consumer.endOffsets(Set(tp).asJava)
+  }
+
+  @Test
+  def testListOffsetsWithTopicDescribe() {
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
+    val consumer = createConsumer()
+    consumer.endOffsets(Set(tp).asJava)
+  }
+
   @Test
   def testDescribeGroupApiWithNoGroupAcl() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Describe)), topicResource)
@@ -1434,7 +1466,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val resp = connectAndSend(request, apiKey)
     val response = requestKeyToResponseDeserializer(apiKey).getMethod("parse", 
classOf[ByteBuffer], classOf[Short]).invoke(
       null, resp, request.version: 
java.lang.Short).asInstanceOf[AbstractResponse]
-    val error = requestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => 
Errors](response)
+    val error = requestKeyToError(apiKey).asInstanceOf[AbstractResponse => 
Errors](response)
 
     val authorizationErrors = resources.flatMap { resourceType =>
       if (resourceType == Topic) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Allow OffsetsForLeaderEpoch requests with topic describe ACL (KIP-320)
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-7394
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7394
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Major
>
> As part of KIP-320, we will allow the OffsetsForLeaderEpoch request to be 
> sent from clients. Currently this API is protected with the cluster resource. 
> We need to extend support to also allow Topic Describe.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to