[
https://issues.apache.org/jira/browse/KAFKA-633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Joel Koshy updated KAFKA-633:
-----------------------------
Attachment: KAFKA-633-v1.patch
This is a timing issue that affects low-volume topics (and in this case an
empty topic). The issue is that the leader that is being shut down receives a
leaderAndIsrRequest informing it is no longer the leader and thus starts up a
follower which starts issuing fetch requests to the new leader. We then shrink
the ISR and send a StopReplicaRequest to the shutting down broker. However, the
new leader upon receiving the fetch request expands the ISR again.
The shutdown itself is working correctly in that the leader has been
successfully moved. This patch fixes the assertion by checking the controller's
cached ISR instead of ZooKeeper. The patch also fixes the annoying
zookeeper-related messages if the test fails - the problem was that the brokers
were not getting shut down and were trying to talk to the torn down zookeeper.
I think it would be better to fix the corner case in a separate non-blocker
jira. One possible approach would be to use the callback feature in the
ControllerBrokerRequestBatch and wait until the StopReplicaRequest has been
processed by the shutting down broker before shrinking the ISR; and there are
probably other ways as well.
> AdminTest.testShutdownBroker fails
> ----------------------------------
>
> Key: KAFKA-633
> URL: https://issues.apache.org/jira/browse/KAFKA-633
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 0.8
> Reporter: Jun Rao
> Assignee: Joel Koshy
> Priority: Blocker
> Attachments: KAFKA-633-v1.patch
>
>
> 0m[ [31merror [0m] [0mTest Failed: testShutdownBroker(kafka.admin.AdminTest)
> [0m
> junit.framework.AssertionFailedError: expected:<2> but was:<3>
> at junit.framework.Assert.fail(Assert.java:47)
> at junit.framework.Assert.failNotEquals(Assert.java:277)
> at junit.framework.Assert.assertEquals(Assert.java:64)
> at junit.framework.Assert.assertEquals(Assert.java:195)
> at junit.framework.Assert.assertEquals(Assert.java:201)
> at kafka.admin.AdminTest.testShutdownBroker(AdminTest.scala:381)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at junit.framework.TestCase.runTest(TestCase.java:164)
> at junit.framework.TestCase.runBare(TestCase.java:130)
> at junit.framework.TestResult$1.protect(TestResult.java:110)
> at junit.framework.TestResult.runProtected(TestResult.java:128)
> at junit.framework.TestResult.run(TestResult.java:113)
> at junit.framework.TestCase.run(TestCase.java:120)
> at junit.framework.TestSuite.runTest(TestSuite.java:228)
> at junit.framework.TestSuite.run(TestSuite.java:223)
> at junit.framework.TestSuite.runTest(TestSuite.java:228)
> at junit.framework.TestSuite.run(TestSuite.java:223)
> at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
> at
> org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
> at sbt.TestRunner.run(TestFramework.scala:53)
> at sbt.TestRunner.runTest$1(TestFramework.scala:67)
> at sbt.TestRunner.run(TestFramework.scala:76)
> at
> sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
> at
> sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
> at
> sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
> at sbt.NamedTestTask.run(TestFramework.scala:92)
> at
> sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
> at
> sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
> at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
> at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
> at sbt.impl.RunTask.runTask(RunTask.scala:85)
> at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
> at
> sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
> at
> sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
> at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
> at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
> at sbt.Control$.trapUnit(Control.scala:19)
> at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira