David Arthur created KAFKA-12777:
------------------------------------

             Summary: AutoTopicCreationManager does not handle response errors
                 Key: KAFKA-12777
                 URL: https://issues.apache.org/jira/browse/KAFKA-12777
             Project: Kafka
          Issue Type: Bug
            Reporter: David Arthur


The request completion callback in AutoTopicCreationManager assumes the 
response is present. 

{code:scala}
override def onComplete(response: ClientResponse): Unit = {
        debug(s"Auto topic creation completed for ${creatableTopics.keys} with 
response ${response.responseBody.toString}.")
        clearInflightRequests(creatableTopics)
      }
{code}

We should at least check to see if the response exists before logging it and 
clearing the in-flight flag. 

This was found while debugging a separate issue. Here is a log snippet:

{code}
[2021-05-13 11:21:03,890] DEBUG [BrokerToControllerChannelManager broker=1 
name=forwarding] Version mismatch when attempting to send 
EnvelopeRequestData(requestData=java.nio.HeapByteBuffer[pos=0 lim=43 cap=43], 
requestPrincipal=[0, 0, 5, 85, 115, 101, 114, 10, 65, 78, 79, 78, 89, 77, 79, 
85, 83, 0, 0], clientHostAddress=[127, 0, 0, 1]) with correlation id 2 to 0 
(org.apache.kafka.clients.NetworkClient:495)
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not 
support ENVELOPE
[2021-05-13 11:21:03,893] ERROR [BrokerToControllerChannelManager broker=1 
name=forwarding]: Request 
EnvelopeRequestData(requestData=java.nio.HeapByteBuffer[pos=0 lim=43 cap=43], 
requestPrincipal=[0, 0, 5, 85, 115, 101, 114, 10, 65, 78, 79, 78, 89, 77, 79, 
85, 83, 0, 0], clientHostAddress=[127, 0, 0, 1]) failed due to unsupported 
version error (kafka.server.BrokerToControllerRequestThread:76)
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not 
support ENVELOPE
[2021-05-13 11:21:03,894] ERROR [BrokerToControllerChannelManager broker=1 
name=forwarding] Uncaught error in request completion: 
(org.apache.kafka.clients.NetworkClient:576)
java.lang.NullPointerException
        at 
kafka.server.DefaultAutoTopicCreationManager$$anon$1.$anonfun$onComplete$1(AutoTopicCreationManager.scala:179)
        at kafka.utils.Logging.debug(Logging.scala:62)
        at kafka.utils.Logging.debug$(Logging.scala:62)
        at 
kafka.server.DefaultAutoTopicCreationManager.debug(AutoTopicCreationManager.scala:67)
        at 
kafka.server.DefaultAutoTopicCreationManager$$anon$1.onComplete(AutoTopicCreationManager.scala:179)
        at 
kafka.server.BrokerToControllerRequestThread.handleResponse(BrokerToControllerChannelManager.scala:355)
        at 
kafka.server.BrokerToControllerRequestThread.$anonfun$generateRequests$1(BrokerToControllerChannelManager.scala:339)
        at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:574)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
        at 
kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:74)
        at 
kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:374)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to