lianetm commented on code in PR #17516: URL: https://github.com/apache/kafka/pull/17516#discussion_r1824955585
########## core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala: ########## @@ -124,12 +128,56 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()).getCause assertTrue(produceException.isInstanceOf[TopicAuthorizationException]) assertEquals(Set(topic), produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala) + } - val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol: String): Unit = { + val topic = "topic" + val topicPartition = new TopicPartition(topic, 0) + + createTopic(topic, listenerName = interBrokerListenerName) + + val consumer = createConsumer() + consumer.assign(List(topicPartition).asJava) + val consumeException = assertThrows(classOf[TopicAuthorizationException], + () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)) + assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) + + assertThrows(classOf[GroupAuthorizationException], + () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)) + + // TODO: use background-event-queue-size metric to check there is background event + Thread.sleep(3000) + + assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.unsubscribe() + }) Review Comment: uhm I think we may be not getting the classic consumer behaviour right here. I would expect this assertion passes for the classic consumer here only because of how the test is written (never finds a coordinator), but that doesn't mean that the classic consumer does not throw topic or group auth exceptions on unsubscribe (which is what the changes in this PR are trying to achieve for the new consumer) 1. I believe classic does not throw GroupAuthException here only because it never finds a coordinator, so the unsubscribes perform no action on the group (but if it had had a known coordinator, the unsubscribe would send a leave group, if the classic checks the response I expect would fail with GroupAuthException, to double check...) 2. I believe classic does not throw TopicAuthException here only because unsubscribe does not poll the network client if it doesn't have a coordinator to send the leave group to. I could definitely be missing something, but we could validate my expectations with an integration test here: - consumer subscribes to a group successfully (has acls to READ + GROUP + "group-name-from-config") - looses the acls (using `removeAndVerifyAcls`) - consumer.unsubscribe -> I would expect that this should indeed throw a group authorization exception received in the response to the leave group, or the topic auth exception propagated from metadata (honestly not sure about the precedence, but both should be there) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org