dajac commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1376679603
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1500,6 +1502,8 @@ class KafkaApis(val requestChannel: RequestChannel,
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(offsetFetchRequest.groupId)
.setErrorCode(Errors.forException(exception).code)
+ } else if (offsetFetchResponse.errorCode() != Errors.NONE.code()) {
Review Comment:
ditto.
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1461,6 +1461,8 @@ class KafkaApis(val requestChannel: RequestChannel,
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(offsetFetchRequest.groupId)
.setErrorCode(Errors.forException(exception).code)
+ } else if (offsetFetchResponse.errorCode() != Errors.NONE.code()) {
Review Comment:
nit: We could remove the `()` after `code`.
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4701,6 +4751,89 @@ class KafkaApisTest {
assertEquals(expectedOffsetFetchResponse, response.data)
}
+ @Test
+ def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = {
+ def makeRequest(version: Short): RequestChannel.Request = {
+ val groups = Map(
+ "group-1" -> List(
+ new TopicPartition("foo", 0),
+ new TopicPartition("bar", 0)
+ ).asJava,
+ "group-2" -> List(
+ new TopicPartition("foo", 0),
+ new TopicPartition("bar", 0)
+ ).asJava
+ ).asJava
+ buildRequest(new OffsetFetchRequest.Builder(groups, false,
false).build(version))
+ }
+
+ val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+
+ val acls = Map(
+ "group-1" -> AuthorizationResult.ALLOWED,
+ "group-2" -> AuthorizationResult.ALLOWED,
+ "foo" -> AuthorizationResult.DENIED,
+ "bar" -> AuthorizationResult.ALLOWED
+ )
+
+ when(authorizer.authorize(
+ any[RequestContext],
+ any[util.List[Action]]
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument(1, classOf[util.List[Action]])
+ actions.asScala.map { action =>
+ acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+ }.asJava
+ }
+
+ // group-1 and group-2 are allowed and bar is allowed.
+ val group1Future = new
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
+ when(groupCoordinator.fetchOffsets(
+ requestChannelRequest.context,
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-1")
+ .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(List[Integer](0).asJava)).asJava),
+ false
+ )).thenReturn(group1Future)
+
+ val group2Future = new
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
+ when(groupCoordinator.fetchOffsets(
+ requestChannelRequest.context,
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("group-2")
+ .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(List[Integer](0).asJava)).asJava),
+ false
+ )).thenReturn(group1Future)
+
+ createKafkaApis(authorizer =
Some(authorizer)).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+ // group-2 mocks using the new group coordinator.
+ // When the coordinator is not active, a response with error code is
returned.
Review Comment:
Should we add a note about `foo` here? The whole point of this test is to
ensure that the failed topics are not present in the response when there is a
top level error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]