[ 
https://issues.apache.org/jira/browse/KAFKA-4569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15929675#comment-15929675
 ] 

ASF GitHub Bot commented on KAFKA-4569:
---------------------------------------

GitHub user original-brownbear opened a pull request:

    https://github.com/apache/kafka/pull/2699

    KAFKA-4569: Make KafkaConsumer Trigger Wakeup before Updating Offsets

    An attempt at resolving https://issues.apache.org/jira/browse/KAFKA-4569 
using the @hachikuji suggestions from 
https://issues.apache.org/jira/browse/KAFKA-4569?focusedCommentId=15901556&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15901556.
    
    I tried to use this:
    
    > We could also add a hasPendingWakeup() or something to 
ConsumerNetworkClient. If a wakeup is expected, then we can just call poll(0) 
to trigger it.
    
    but instead of adding a new method to `ConsumerNetworkClient` I simply 
exposed 
`org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#maybeTriggerWakeup`
 publicly. Since we still have to synchronize to not run into issues with 
concurrent calls to 
`org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#pollNoWakeup`
 by the heartbeat thread, this seemed to be the shortest route to triggering 
the wake up if one is pending before committing offsets.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/original-brownbear/kafka KAFKA-4569

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2699.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2699
    
----
commit 6d8e3f616ab5cfe3e961a79b77dba4576d61e55d
Author: Armin Braun <armin.br...@1und1.de>
Date:   2017-03-17T09:48:23Z

    KAFKA-4569 make KafkaConsumer trigger wakeup before updating offsets

----


> Transient failure in 
> org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4569
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4569
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: unit tests
>            Reporter: Guozhang Wang
>            Assignee: Umesh Chaudhary
>              Labels: newbie
>             Fix For: 0.11.0.0, 0.10.2.1
>
>
> One example is:
> https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/370/testReport/junit/org.apache.kafka.clients.consumer/KafkaConsumerTest/testWakeupWithFetchDataAvailable/
> {code}
> Stacktrace
> java.lang.AssertionError
>       at org.junit.Assert.fail(Assert.java:86)
>       at org.junit.Assert.fail(Assert.java:95)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable(KafkaConsumerTest.java:679)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
>       at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>       at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>       at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>       at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>       at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
>       at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:377)
>       at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
>       at 
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to