[ https://issues.apache.org/jira/browse/KAFKA-3117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15248119#comment-15248119 ]
Jason Gustafson commented on KAFKA-3117: ---------------------------------------- I ran this test case for about an hour and couldn't reproduce the failure. The only explanation I can come up with is that the consumer doesn't have metadata for the second topic when it recomputes the assignment. Looking at the code, it does seem possible for bad interplay between a concurrent metadata fetch and a rebalance. We assume in the code that the metadata fetch will return before the rebalance begins, but it actually seems possible for the rebalance to begin first since the TopicMetadata request could end up blocking behind a fetch request. This appears to cause a race condition. If the TopicMetadata returns before the rebalance has completed, we can end up ignoring the change to the partition state. Basically the sequence I'm imagining looks like this: 1. User is initially subscribed to [A] 2. User calls subscribe([A, B]) and then poll() 3. TopicMetadata request is sent for [A, B] 4. Rebalance begins. JoinGroup round completes, the client performs the assignment using topic metadata for [A] and sends SyncGroup. 5. TopicMetadata request returns with partition metadata for [A, B] and we set the flag in SubscriptionState to note that a reassignment is needed 6. SyncGroup returns and unsets the flag in SubscriptionState After this happens, we have up-to-date metadata, but we incorrectly believe that we've already done the assignment based on that metadata. As far as I can tell, we'll stay in this state until the next rebalance (not the next metadata fetch). > Fail test at: PlaintextConsumerTest. testAutoCommitOnRebalance > --------------------------------------------------------------- > > Key: KAFKA-3117 > URL: https://issues.apache.org/jira/browse/KAFKA-3117 > Project: Kafka > Issue Type: Sub-task > Components: consumer > Affects Versions: 0.9.0.0 > Environment: oracle java764bit > ubuntu 13.10 > Reporter: edwardt > Assignee: Jason Gustafson > Labels: newbie, test > > java.lang.AssertionError: Expected partitions [topic-0, topic-1, topic2-0, > topic2-1] but actually got [topic-0, topic-1] > at org.junit.Assert.fail(Assert.java:88) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:730) > at > kafka.api.BaseConsumerTest.testAutoCommitOnRebalance(BaseConsumerTest.scala:125) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:22 -- This message was sent by Atlassian JIRA (v6.3.4#6332)