[ 
https://issues.apache.org/jira/browse/KAFKA-4569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877598#comment-15877598
 ] 

Armin Braun commented on KAFKA-4569:
------------------------------------

Just for future reference. Tried to debug a few issues around this that I found 
and figured out that this is coming from the heartbeat thread.

Basically it's getting enabled without any delay in 
`org.apache.kafka.clients.consumer.internals.AbstractCoordinator#initiateJoinGroup`
 and then in some cases polls quickly enough to make 
`org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce` break out here:

{code}
    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long 
timeout) {
        coordinator.poll(time.milliseconds());

        // fetch positions if we have partitions we're subscribed to that we
        // don't know the offset for
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        // if data is available already, return it immediately
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = 
fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;
{code}

Not quite sure how to fix this, but you can reproduce it by running it in an 
endless loop (like 2ms per run and dies after 500 or so on average :)) -> red 
... goes green once you outcomment 
this enable section

{code}
                        if (heartbeatThread != null)
                            heartbeatThread.enable();
                    }
                }
{code}

in 
`org.apache.kafka.clients.consumer.internals.AbstractCoordinator#initiateJoinGroup`

> Transient failure in 
> org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4569
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4569
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: unit tests
>            Reporter: Guozhang Wang
>            Assignee: Umesh Chaudhary
>              Labels: newbie
>             Fix For: 0.10.3.0
>
>
> One example is:
> https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/370/testReport/junit/org.apache.kafka.clients.consumer/KafkaConsumerTest/testWakeupWithFetchDataAvailable/
> {code}
> Stacktrace
> java.lang.AssertionError
>       at org.junit.Assert.fail(Assert.java:86)
>       at org.junit.Assert.fail(Assert.java:95)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable(KafkaConsumerTest.java:679)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
>       at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>       at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>       at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>       at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>       at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
>       at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:377)
>       at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
>       at 
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to