[ 
https://issues.apache.org/jira/browse/KAFKA-3117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15248119#comment-15248119
 ] 

Jason Gustafson commented on KAFKA-3117:
----------------------------------------

I ran this test case for about an hour and couldn't reproduce the failure. The 
only explanation I can come up with is that the consumer doesn't have metadata 
for the second topic when it recomputes the assignment. Looking at the code, it 
does seem possible for bad interplay between a concurrent metadata fetch and a 
rebalance. We assume in the code that the metadata fetch will return before the 
rebalance begins, but it actually seems possible for the rebalance to begin 
first since the TopicMetadata request could end up blocking behind a fetch 
request. This appears to cause a race condition. If the TopicMetadata returns 
before the rebalance has completed, we can end up ignoring the change to the 
partition state. Basically the sequence I'm imagining looks like this:

1. User is initially subscribed to [A]
2. User calls subscribe([A, B]) and then poll()
3. TopicMetadata request is sent for [A, B]
4. Rebalance begins. JoinGroup round completes, the client performs the 
assignment using topic metadata for [A] and sends SyncGroup.
5. TopicMetadata request returns with partition metadata for [A, B] and we set 
the flag in SubscriptionState to note that a reassignment is needed
6. SyncGroup returns and unsets the flag in SubscriptionState

After this happens, we have up-to-date metadata, but we incorrectly believe 
that we've already done the assignment based on that metadata. As far as I can 
tell, we'll stay in this state until the next rebalance (not the next metadata 
fetch).

> Fail test at: PlaintextConsumerTest. testAutoCommitOnRebalance 
> ---------------------------------------------------------------
>
>                 Key: KAFKA-3117
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3117
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: consumer
>    Affects Versions: 0.9.0.0
>         Environment: oracle java764bit
> ubuntu 13.10 
>            Reporter: edwardt
>            Assignee: Jason Gustafson
>              Labels: newbie, test
>
> java.lang.AssertionError: Expected partitions [topic-0, topic-1, topic2-0, 
> topic2-1] but actually got [topic-0, topic-1]
>       at org.junit.Assert.fail(Assert.java:88)
>       at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:730)
>       at 
> kafka.api.BaseConsumerTest.testAutoCommitOnRebalance(BaseConsumerTest.scala:125)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:22



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to