JIRA created https://issues.apache.org/jira/browse/KAFKA-1883 and patch submitted for review. Thanks Guozhang.

-Jaikiran

On Tuesday 20 January 2015 05:53 AM, Guozhang Wang wrote:
Hi Jaikiran,

This is a real bug, could you file a JIRA?

As for the fix, I think your proposal would be the right way to fix it.
"

Guozhang

On Mon, Jan 19, 2015 at 9:07 AM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:

I often see the following exception while running some tests
(ProducerFailureHandlingTest.testNoResponse is one such instance):


[2015-01-19 22:30:24,257] ERROR [Controller-0-to-broker-1-send-thread],
Controller 0 fails to send a request to broker
id:1,host:localhost,port:56729 (kafka.controller.RequestSendThread:103)
java.lang.NullPointerException
     at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.
scala:150)
     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


Looking at that code in question, I can see that the NPE can be trigger
when the "receive" is null which can happen if the "isRunning" is false
(i.e a shutdown has been requested). The fix to prevent this seems
straightforward:

diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index eb492f0..10f4c5a 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int,
                Utils.swallow(Thread.sleep(300))
            }
          }
-        var response: RequestOrResponse = null
-        request.requestId.get match {
-          case RequestKeys.LeaderAndIsrKey =>
-            response = LeaderAndIsrResponse.readFrom(receive.buffer)
-          case RequestKeys.StopReplicaKey =>
-            response = StopReplicaResponse.readFrom(receive.buffer)
-          case RequestKeys.UpdateMetadataKey =>
-            response = UpdateMetadataResponse.readFrom(receive.buffer)
-        }
-        stateChangeLogger.trace("Controller %d epoch %d received
response %s for a request sent to broker %s"
-                                  .format(controllerId,
controllerContext.epoch, response.toString, toBroker.toString))
+        if (receive != null) {
+          var response: RequestOrResponse = null
+          request.requestId.get match {
+            case RequestKeys.LeaderAndIsrKey =>
+              response = LeaderAndIsrResponse.readFrom(receive.buffer)
+            case RequestKeys.StopReplicaKey =>
+              response = StopReplicaResponse.readFrom(receive.buffer)
+            case RequestKeys.UpdateMetadataKey =>
+              response = UpdateMetadataResponse.readFrom(receive.buffer)
+          }
+          stateChangeLogger.trace("Controller %d epoch %d received
response %s for a request sent to broker %s"
+            .format(controllerId, controllerContext.epoch,
response.toString, toBroker.toString))

-        if(callback != null) {
-          callback(response)
+          if (callback != null) {
+            callback(response)
+          }
          }
        }


However can this really be considered a fix or would this just be hiding
the real issue and would there be something more that will have to be done
in this case? I'm on trunk FWIW.


-Jaikiran




Reply via email to