lianetm commented on code in PR #17516:
URL: https://github.com/apache/kafka/pull/17516#discussion_r1824955585


##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -124,12 +128,56 @@ class GroupAuthorizerIntegrationTest extends 
BaseRequestTest {
       () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()).getCause
     assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
     assertEquals(Set(topic), 
produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
+  }
 
-    val consumer = createConsumer(configsToRemove = 
List(ConsumerConfig.GROUP_ID_CONFIG))
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol: 
String): Unit = {
+    val topic = "topic"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, listenerName = interBrokerListenerName)
+
+    val consumer = createConsumer()
+    consumer.assign(List(topicPartition).asJava)
+    val consumeException = assertThrows(classOf[TopicAuthorizationException],
+      () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+    assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
+
+    assertThrows(classOf[GroupAuthorizationException],
+      () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+
+    // TODO: use background-event-queue-size metric to check there is 
background event
+    Thread.sleep(3000)
+
+    assertDoesNotThrow(new Executable {
+      override def execute(): Unit = consumer.unsubscribe()
+    })

Review Comment:
   uhm I think we may be not getting the classic consumer behaviour right here. 
I would expect this assertion passes for the classic consumer here only because 
of how the test is written (never finds a coordinator), but that doesn't mean 
that the classic consumer does not throw topic or group auth exceptions on 
unsubscribe (which is what the changes in this PR are trying to achieve for the 
new consumer)
   
   1. I believe classic does not throw GroupAuthException here only because it 
never finds a coordinator, so the unsubscribes perform no action on the group 
(but if it had had a known coordinator, the unsubscribe would send a leave 
group, if the classic checks the response I expect would fail with 
GroupAuthException, to double check...)
   2. I believe classic does not throw TopicAuthException here only because 
unsubscribe does not poll the network client if it doesn't have a coordinator 
to send the leave group to. 
   
   I could definitely be missing something, but we could validate my 
expectations with an integration test here:  
   
   - consumer subscribes to a group successfully (has acls to READ + GROUP + 
"group-name-from-config")
   - looses the acls (using `removeAndVerifyAcls`)
   - consumer.unsubscribe -> I would expect that this should indeed throw a 
group authorization exception received in the response to the leave group, or 
the topic auth exception propagated from metadata (honestly not sure about the 
precedence, but both should be there)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to