[ 
https://issues.apache.org/jira/browse/KAFKA-4885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925516#comment-15925516
 ] 

Guozhang Wang commented on KAFKA-4885:
--------------------------------------

Looked into the logs, the root cause is actually because the broker got into an 
bad state:

{code}
[2017-03-14 04:59:10,795] ERROR Processor got uncaught exception. 
(kafka.network.Processor)
java.util.ConcurrentModificationException
        at java.util.HashMap$HashIterator.nextEntry(HashMap.java:922)
        at java.util.HashMap$EntryIterator.next(HashMap.java:962)
        at java.util.HashMap$EntryIterator.next(HashMap.java:960)
        at 
org.apache.kafka.common.utils.CollectionUtils.groupDataByTopic(CollectionUtils.java:35)
        at 
org.apache.kafka.common.requests.ProduceRequest.toStruct(ProduceRequest.java:113)
        at 
org.apache.kafka.common.requests.AbstractRequest.toString(AbstractRequest.java:84)
        at java.lang.String.valueOf(String.java:2849)
        at java.lang.StringBuilder.append(StringBuilder.java:128)
        at scala.StringContext.standardInterpolator(StringContext.scala:122)
        at scala.StringContext.s(StringContext.scala:90)
        at 
kafka.network.RequestChannel$Request.requestDesc(RequestChannel.scala:111)
        at 
kafka.network.RequestChannel$Request.updateRequestMetrics(RequestChannel.scala:169)
        at 
kafka.network.Processor$$anonfun$processCompletedSends$1.apply(SocketServer.scala:532)
        at 
kafka.network.Processor$$anonfun$processCompletedSends$1.apply(SocketServer.scala:528)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.processCompletedSends(SocketServer.scala:528)
        at kafka.network.Processor.run(SocketServer.scala:434)
        at java.lang.Thread.run(Thread.java:745)
{code}

Looked at the existing JIRAs, there are some similar ones but seem none of the 
are exactly the same as this one. If that is really the case we can file a new 
JIRA. cc [~ijuma]

Because of that, producer request will not be handled and hence eventually be 
expired and aborted:

{code}
[2017-03-14 04:59:45,266] ERROR task [0_0] Error sending record to topic 
simpleBenchmarkSinkTopic. No more offsets will be recorded for this task and 
the exception will eventually be thrown 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
org.apache.kafka.common.errors.TimeoutException: Expiring 118 record(s) for 
simpleBenchmarkSinkTopic-0: 30266 ms has passed since last append
[2017-03-14 04:59:45,266] ERROR task [0_0] Error sending record to topic 
simpleBenchmarkSinkTopic. No more offsets will be recorded for this task and 
the exception will eventually be thrown 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
org.apache.kafka.common.errors.TimeoutException: Expiring 118 record(s) for 
simpleBenchmarkSinkTopic-0: 30266 ms has passed since last append
[2017-03-14 04:59:45,267] ERROR task [0_0] Error sending record to topic 
simpleBenchmarkSinkTopic. No more offsets will be recorded for this task and 
the exception will eventually be thrown 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
...
{code}

This logic is added in https://issues.apache.org/jira/browse/KAFKA-4473 
(https://github.com/apache/kafka/pull/2249), and because of that, the thread 
would be shutdown in the finally block of {{StreamThread}}, at a much earlier 
time (we do not have the uncaught exception handler):

{code}
[2017-03-14 04:59:45,873] ERROR stream-thread 
[simple-benchmark-streams-with-sink-b59e4280-9953-439e-93cd-0e46d46c2cea-StreamThread-1]
 Failed to commit StreamTask 0_0 state:  
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught 
when producing
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:118)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:126)
        at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:77)
        at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:282)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:727)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:714)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:690)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:613)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:318)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 118 
record(s) for simpleBenchmarkSinkTopic-0: 30075 ms has passed since last append
{code}

And hence we the services tries to shutdown the instance later (two minutes 
later), there is no thread any more, hence it will be timed out and fail.

{code}
[INFO  - 2017-03-14 05:11:08,281 - runner_client - log - lineno:221]: 
RunnerClient: 
kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithsink.scale=1:
 Summary: Streams Test process on ubuntu@worker1 took too long to exit
{code}

> processstreamwithcachedstatestore and other streams benchmarks fail 
> occasionally
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-4885
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4885
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Eno Thereska
>             Fix For: 0.11.0.0
>
>
> test_id:    
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithcachedstatestore.scale=2
> status:     FAIL
> run time:   14 minutes 58.069 seconds
>     Streams Test process on ubuntu@worker5 took too long to exit
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
>     data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
>     return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py",
>  line 86, in test_simple_benchmark
>     self.driver[num].wait()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 102, in wait
>     self.wait_node(node, timeout_sec)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 106, in wait_node
>     wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, 
> err_msg="Streams Test process on " + str(node.account) + " took too long to 
> exit")
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
>     raise TimeoutError(err_msg)
> TimeoutError: Streams Test process on ubuntu@worker5 took too long to exit
> The log contains several lines like:
> [2017-03-11 04:52:59,080] DEBUG Attempt to heartbeat failed for group 
> simple-benchmark-streams-with-storetrue since it is rebalancing. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:01,987] DEBUG Sending Heartbeat request for group 
> simple-benchmark-streams-with-storetrue to coordinator worker10:9092 (id: 
> 2147483646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:02,088] DEBUG Attempt to heartbeat failed for group 
> simple-benchmark-streams-with-storetrue since it is rebalancing. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-03-11 04:53:04,995] DEBUG Sending Heartbeat request for group 
> simple-benchmark-streams-with-storetrue to coordinator worker10:9092 (id: 
> 2147483646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> Other tests that fail the same way include:
> test_id:    
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=count.scale=2
> test_id:    
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithsink.scale=1
> test_id:    
> kafkatest.tests.streams.streams_bounce_test.StreamsBounceTest.test_bounce



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to