PoAn Yang created KAFKA-17581:
---------------------------------

             Summary: AsyncKafkaConsumer can't unsubscribe invalid topics
                 Key: KAFKA-17581
                 URL: https://issues.apache.org/jira/browse/KAFKA-17581
             Project: Kafka
          Issue Type: Bug
            Reporter: PoAn Yang
            Assignee: PoAn Yang


When consumer subscribes an invalid topic name like " this is test", classic 
consumer can unsubscribe without error. However, async consumer can't. We can 
use following integration test to validate:

 
{code:java}
@ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = {
  // Invalid topic name due to space
  val invalidTopicName = "topic abc"
  val consumer = createConsumer()

  consumer.subscribe(List(invalidTopicName).asJava)

  var exception : InvalidTopicException = null
  TestUtils.waitUntilTrue(() => {
    try consumer.poll(Duration.ofMillis(500)) catch {
      case e : InvalidTopicException => exception = e
      case e : Throwable => fail(s"An InvalidTopicException should be thrown. 
But ${e.getClass} is thrown")
    }
    exception != null
  }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.")

  assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage)
  // AsyncKafkaConsumer sends request in background thread. Wait enough time to 
send next request.
  Thread.sleep(1000)
  assertDoesNotThrow(new Executable {
    override def execute(): Unit = consumer.unsubscribe()
  })
}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to