[ https://issues.apache.org/jira/browse/KAFKA-4885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925516#comment-15925516 ]
Guozhang Wang commented on KAFKA-4885: -------------------------------------- Looked into the logs, the root cause is actually because the broker got into an bad state: {code} [2017-03-14 04:59:10,795] ERROR Processor got uncaught exception. (kafka.network.Processor) java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextEntry(HashMap.java:922) at java.util.HashMap$EntryIterator.next(HashMap.java:962) at java.util.HashMap$EntryIterator.next(HashMap.java:960) at org.apache.kafka.common.utils.CollectionUtils.groupDataByTopic(CollectionUtils.java:35) at org.apache.kafka.common.requests.ProduceRequest.toStruct(ProduceRequest.java:113) at org.apache.kafka.common.requests.AbstractRequest.toString(AbstractRequest.java:84) at java.lang.String.valueOf(String.java:2849) at java.lang.StringBuilder.append(StringBuilder.java:128) at scala.StringContext.standardInterpolator(StringContext.scala:122) at scala.StringContext.s(StringContext.scala:90) at kafka.network.RequestChannel$Request.requestDesc(RequestChannel.scala:111) at kafka.network.RequestChannel$Request.updateRequestMetrics(RequestChannel.scala:169) at kafka.network.Processor$$anonfun$processCompletedSends$1.apply(SocketServer.scala:532) at kafka.network.Processor$$anonfun$processCompletedSends$1.apply(SocketServer.scala:528) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.network.Processor.processCompletedSends(SocketServer.scala:528) at kafka.network.Processor.run(SocketServer.scala:434) at java.lang.Thread.run(Thread.java:745) {code} Looked at the existing JIRAs, there are some similar ones but seem none of the are exactly the same as this one. If that is really the case we can file a new JIRA. cc [~ijuma] Because of that, producer request will not be handled and hence eventually be expired and aborted: {code} [2017-03-14 04:59:45,266] ERROR task [0_0] Error sending record to topic simpleBenchmarkSinkTopic. No more offsets will be recorded for this task and the exception will eventually be thrown (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) org.apache.kafka.common.errors.TimeoutException: Expiring 118 record(s) for simpleBenchmarkSinkTopic-0: 30266 ms has passed since last append [2017-03-14 04:59:45,266] ERROR task [0_0] Error sending record to topic simpleBenchmarkSinkTopic. No more offsets will be recorded for this task and the exception will eventually be thrown (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) org.apache.kafka.common.errors.TimeoutException: Expiring 118 record(s) for simpleBenchmarkSinkTopic-0: 30266 ms has passed since last append [2017-03-14 04:59:45,267] ERROR task [0_0] Error sending record to topic simpleBenchmarkSinkTopic. No more offsets will be recorded for this task and the exception will eventually be thrown (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) ... {code} This logic is added in https://issues.apache.org/jira/browse/KAFKA-4473 (https://github.com/apache/kafka/pull/2249), and because of that, the thread would be shutdown in the finally block of {{StreamThread}}, at a much earlier time (we do not have the uncaught exception handler): {code} [2017-03-14 04:59:45,873] ERROR stream-thread [simple-benchmark-streams-with-sink-b59e4280-9953-439e-93cd-0e46d46c2cea-StreamThread-1] Failed to commit StreamTask 0_0 state: (org.apache.kafka.streams.processor.internals.StreamThread) org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:118) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:126) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:77) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:282) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:727) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:714) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:690) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:613) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:318) Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 118 record(s) for simpleBenchmarkSinkTopic-0: 30075 ms has passed since last append {code} And hence we the services tries to shutdown the instance later (two minutes later), there is no thread any more, hence it will be timed out and fail. {code} [INFO - 2017-03-14 05:11:08,281 - runner_client - log - lineno:221]: RunnerClient: kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithsink.scale=1: Summary: Streams Test process on ubuntu@worker1 took too long to exit {code} > processstreamwithcachedstatestore and other streams benchmarks fail > occasionally > --------------------------------------------------------------------------------- > > Key: KAFKA-4885 > URL: https://issues.apache.org/jira/browse/KAFKA-4885 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Eno Thereska > Fix For: 0.11.0.0 > > > test_id: > kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithcachedstatestore.scale=2 > status: FAIL > run time: 14 minutes 58.069 seconds > Streams Test process on ubuntu@worker5 took too long to exit > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 123, in run > data = self.run_test() > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 176, in run_test > return self.test_context.function(self.test) > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py", > line 321, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py", > line 86, in test_simple_benchmark > self.driver[num].wait() > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py", > line 102, in wait > self.wait_node(node, timeout_sec) > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py", > line 106, in wait_node > wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, > err_msg="Streams Test process on " + str(node.account) + " took too long to > exit") > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py", > line 36, in wait_until > raise TimeoutError(err_msg) > TimeoutError: Streams Test process on ubuntu@worker5 took too long to exit > The log contains several lines like: > [2017-03-11 04:52:59,080] DEBUG Attempt to heartbeat failed for group > simple-benchmark-streams-with-storetrue since it is rebalancing. > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2017-03-11 04:53:01,987] DEBUG Sending Heartbeat request for group > simple-benchmark-streams-with-storetrue to coordinator worker10:9092 (id: > 2147483646 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2017-03-11 04:53:02,088] DEBUG Attempt to heartbeat failed for group > simple-benchmark-streams-with-storetrue since it is rebalancing. > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2017-03-11 04:53:04,995] DEBUG Sending Heartbeat request for group > simple-benchmark-streams-with-storetrue to coordinator worker10:9092 (id: > 2147483646 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > Other tests that fail the same way include: > test_id: > kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=count.scale=2 > test_id: > kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=processstreamwithsink.scale=1 > test_id: > kafkatest.tests.streams.streams_bounce_test.StreamsBounceTest.test_bounce -- This message was sent by Atlassian JIRA (v6.3.15#6346)