[
https://issues.apache.org/jira/browse/KAFKA-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613082#comment-16613082
]
ASF GitHub Bot commented on KAFKA-7394:
---------------------------------------
hachikuji closed pull request #5634: KAFKA-7394; OffsetsForLeaderEpoch supports
topic describe access
URL: https://github.com/apache/kafka/pull/5634
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala
b/core/src/main/scala/kafka/cluster/Replica.scala
index 839579bccb2..d729dadcb48 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -194,15 +194,16 @@ class Replica(val brokerId: Int,
override def toString: String = {
val replicaString = new StringBuilder
- replicaString.append("ReplicaId: " + brokerId)
- replicaString.append("; Topic: " + topicPartition.topic)
- replicaString.append("; Partition: " + topicPartition.partition)
- replicaString.append("; isLocal: " + isLocal)
- replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs)
+ replicaString.append("Replica(replicaId=" + brokerId)
+ replicaString.append(s", topic=${topicPartition.topic}")
+ replicaString.append(s", partition=${topicPartition.partition}")
+ replicaString.append(s", isLocal=$isLocal")
+ replicaString.append(s", lastCaughtUpTimeMs=$lastCaughtUpTimeMs")
if (isLocal) {
- replicaString.append("; Highwatermark: " + highWatermark)
- replicaString.append("; LastStableOffset: " + lastStableOffset)
+ replicaString.append(s", highWatermark=$highWatermark")
+ replicaString.append(s", lastStableOffset=$lastStableOffset")
}
+ replicaString.append(")")
replicaString.toString
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5a82cc4530a..afbe5b88204 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -181,7 +181,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ if (isAuthorizedClusterAction(request)) {
val response = replicaManager.becomeLeaderOrFollower(correlationId,
leaderAndIsrRequest, onLeadershipChange)
sendResponseExemptThrottle(request, response)
} else {
@@ -196,7 +196,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.body[StopReplicaRequest]
- if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ if (isAuthorizedClusterAction(request)) {
val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
// Clearing out the cache for groups that belong to an offsets topic
partition for which this broker was the leader,
// since this broker is no longer a replica for that offsets topic
partition.
@@ -223,7 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val correlationId = request.header.correlationId
val updateMetadataRequest = request.body[UpdateMetadataRequest]
- if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ if (isAuthorizedClusterAction(request)) {
val deletedPartitions =
replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
if (deletedPartitions.nonEmpty)
groupCoordinator.handleDeletedPartitions(deletedPartitions)
@@ -2038,12 +2038,27 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit
= {
val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
- val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition()
- authorizeClusterAction(request)
+ val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition.asScala
+
+ // The OffsetsForLeaderEpoch API was initially only used for inter-broker
communication and required
+ // cluster permission. With KIP-320, the consumer now also uses this API
to check for log truncation
+ // following a leader change, so we also allow topic describe permission.
+ val (authorizedPartitions, unauthorizedPartitions) = if
(isAuthorizedClusterAction(request)) {
+ (requestInfo, Map.empty[TopicPartition,
OffsetsForLeaderEpochRequest.PartitionData])
+ } else {
+ requestInfo.partition {
+ case (tp, _) => authorize(request.session, Describe, Resource(Topic,
tp.topic, LITERAL))
+ }
+ }
- val lastOffsetForLeaderEpoch =
replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava
+ val endOffsetsForAuthorizedPartitions =
replicaManager.lastOffsetForLeaderEpoch(authorizedPartitions)
+ val endOffsetsForUnauthorizedPartitions =
unauthorizedPartitions.mapValues(_ =>
+ new EpochEndOffset(Errors.TOPIC_AUTHORIZATION_FAILED,
EpochEndOffset.UNDEFINED_EPOCH,
+ EpochEndOffset.UNDEFINED_EPOCH_OFFSET))
+
+ val endOffsetsForAllPartitions = endOffsetsForAuthorizedPartitions ++
endOffsetsForUnauthorizedPartitions
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new OffsetsForLeaderEpochResponse(requestThrottleMs,
lastOffsetForLeaderEpoch))
+ new OffsetsForLeaderEpochResponse(requestThrottleMs,
endOffsetsForAllPartitions.asJava))
}
def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
@@ -2247,10 +2262,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
- if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
+ if (!isAuthorizedClusterAction(request))
throw new ClusterAuthorizationException(s"Request $request is not
authorized.")
}
+ private def isAuthorizedClusterAction(request: RequestChannel.Request):
Boolean = {
+ authorize(request.session, ClusterAction, Resource.ClusterResource)
+ }
+
def authorizeClusterAlter(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, Alter, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not
authorized.")
@@ -2283,7 +2302,11 @@ class KafkaApis(val requestChannel: RequestChannel,
private def handleError(request: RequestChannel.Request, e: Throwable) {
val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] ||
!request.header.apiKey.clusterAction
- error("Error when handling request
%s".format(request.body[AbstractRequest]), e)
+ error("Error when handling request: " +
+ s"clientId=${request.header.clientId}, " +
+ s"correlationId=${request.header.correlationId}, " +
+ s"api=${request.header.apiKey}, " +
+ s"body=${request.body[AbstractRequest]}", e)
if (mayThrottle)
sendErrorResponseMaybeThrottle(request, e)
else
@@ -2361,4 +2384,5 @@ class KafkaApis(val requestChannel: RequestChannel,
private def sendResponse(response: RequestChannel.Response): Unit = {
requestChannel.sendResponse(response)
}
+
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d61240e3af6..114e69c4275 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1494,4 +1494,3 @@ class ReplicaManager(val config: KafkaConfig,
}
}
}
-
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index cfc6b5e43d7..dbe2a02e811 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -211,7 +211,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.CREATE_TOPICS -> topicCreateAcl,
ApiKeys.DELETE_TOPICS -> topicDeleteAcl,
ApiKeys.DELETE_RECORDS -> topicDeleteAcl,
- ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl,
+ ApiKeys.OFFSET_FOR_LEADER_EPOCH -> topicDescribeAcl,
ApiKeys.DESCRIBE_CONFIGS -> topicDescribeConfigsAcl,
ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl,
ApiKeys.INIT_PRODUCER_ID -> (transactionIdWriteAcl ++
clusterIdempotentWriteAcl),
@@ -277,7 +277,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
build()
}
- private def offsetsForLeaderEpochRequest = {
+ private def offsetsForLeaderEpochRequest: OffsetsForLeaderEpochRequest = {
new
OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion)
.add(tp, Optional.of(27), 7).build()
}
@@ -382,7 +382,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def addOffsetsToTxnRequest = new
AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build()
-
@Test
def testAuthorizationWithTopicExisting() {
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
@@ -400,7 +399,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.HEARTBEAT -> heartbeatRequest,
ApiKeys.LEAVE_GROUP -> leaveGroupRequest,
ApiKeys.LEADER_AND_ISR -> leaderAndIsrRequest,
- ApiKeys.STOP_REPLICA -> stopReplicaRequest,
ApiKeys.CONTROLLED_SHUTDOWN -> controlledShutdownRequest,
ApiKeys.CREATE_TOPICS -> createTopicsRequest,
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
@@ -415,7 +413,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest,
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
- ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest
+ ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
+
+ // Check StopReplica last since some APIs depend on replica availability
+ ApiKeys.STOP_REPLICA -> stopReplicaRequest
)
for ((key, request) <- requestKeyToRequest) {
@@ -426,7 +427,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val resourceToAcls = requestKeysToAcls(key)
resourceToAcls.get(topicResource).foreach { acls =>
val describeAcls = topicDescribeAcl(topicResource)
- val isAuthorized = describeAcls == acls
+ val isAuthorized = describeAcls == acls
addAndVerifyAcls(describeAcls, topicResource)
sendRequestAndVerifyResponseError(key, request, resources,
isAuthorized = isAuthorized)
removeAllAcls()
@@ -460,7 +461,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
- ApiKeys.DELETE_GROUPS -> deleteGroupsRequest
+ ApiKeys.DELETE_GROUPS -> deleteGroupsRequest,
+ ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest
)
for ((key, request) <- requestKeyToRequest) {
@@ -504,11 +506,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val resources = Set(topicResource.resourceType,
Resource.ClusterResource.resourceType)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized =
false)
- val readAcls = topicReadAcl.get(topicResource).get
+ val readAcls = topicReadAcl(topicResource)
addAndVerifyAcls(readAcls, topicResource)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized =
false)
- val clusterAcls = clusterAcl.get(Resource.ClusterResource).get
+ val clusterAcls = clusterAcl(Resource.ClusterResource)
+ addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized =
true)
+ }
+
+ @Test
+ def testOffsetsForLeaderEpochClusterPermission(): Unit = {
+ val key = ApiKeys.OFFSET_FOR_LEADER_EPOCH
+ val request = offsetsForLeaderEpochRequest
+
+ removeAllAcls()
+
+ val resources = Set(topicResource.resourceType,
Resource.ClusterResource.resourceType)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized =
false)
+
+ // Although the OffsetsForLeaderEpoch API now accepts topic describe, we
should continue
+ // allowing cluster action for backwards compatibility
+ val clusterAcls = clusterAcl(Resource.ClusterResource)
addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized =
true)
}
@@ -1010,18 +1029,31 @@ class AuthorizerIntegrationTest extends BaseRequestTest
{
}
@Test(expected = classOf[TopicAuthorizationException])
- def testListOffsetsWithNoTopicAccess() {
+ def testMetadataWithNoTopicAccess() {
val consumer = createConsumer()
consumer.partitionsFor(topic)
}
@Test
- def testListOffsetsWithTopicDescribe() {
+ def testMetadataWithTopicDescribe() {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost,
Describe)), topicResource)
val consumer = createConsumer()
consumer.partitionsFor(topic)
}
+ @Test(expected = classOf[TopicAuthorizationException])
+ def testListOffsetsWithNoTopicAccess() {
+ val consumer = createConsumer()
+ consumer.endOffsets(Set(tp).asJava)
+ }
+
+ @Test
+ def testListOffsetsWithTopicDescribe() {
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost,
Describe)), topicResource)
+ val consumer = createConsumer()
+ consumer.endOffsets(Set(tp).asJava)
+ }
+
@Test
def testDescribeGroupApiWithNoGroupAcl() {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost,
Describe)), topicResource)
@@ -1434,7 +1466,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val resp = connectAndSend(request, apiKey)
val response = requestKeyToResponseDeserializer(apiKey).getMethod("parse",
classOf[ByteBuffer], classOf[Short]).invoke(
null, resp, request.version:
java.lang.Short).asInstanceOf[AbstractResponse]
- val error = requestKeyToError(apiKey).asInstanceOf[(AbstractResponse) =>
Errors](response)
+ val error = requestKeyToError(apiKey).asInstanceOf[AbstractResponse =>
Errors](response)
val authorizationErrors = resources.flatMap { resourceType =>
if (resourceType == Topic) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Allow OffsetsForLeaderEpoch requests with topic describe ACL (KIP-320)
> ----------------------------------------------------------------------
>
> Key: KAFKA-7394
> URL: https://issues.apache.org/jira/browse/KAFKA-7394
> Project: Kafka
> Issue Type: Improvement
> Reporter: Jason Gustafson
> Assignee: Jason Gustafson
> Priority: Major
>
> As part of KIP-320, we will allow the OffsetsForLeaderEpoch request to be
> sent from clients. Currently this API is protected with the cluster resource.
> We need to extend support to also allow Topic Describe.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)