After a while the instance started running. 2017-05-05 22:40:26.806 INFO 85 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Committing task StreamTask 1_62 (this is literally the next message) 2017-05-05 23:13:27.820 INFO 85 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Committing all tasks because the commit interval 10000ms has elapsed
On Fri, May 5, 2017 at 3:48 PM João Peixoto <joao.harti...@gmail.com> wrote: > Warning, long message > > *Problem*: Initializing a Kafka Stream is taking a loooong time. > Currently at the 40 minute mark > > *Setup*: > 2 co-partition topics with 100 partitions. > First topic contains a lot of messages in the order of hundreds of millions > Second topic is a KTable and contains ~30k records > > Kafka cluster with 6 brokers running 0.10.1 > > Kafka streams running on 0.10.2.1. 5 instances with 5 threads each. > The instances are running on Kubernetes > > *Stream Configuration*: > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ...); > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.ByteArray().getClass().getName()); > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); > > *The events*: > I started 5 instances of my stream configuration at the same time. This is > the first > time this configuration is running. > > 2017-05-05 21:23:03.283 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > Creating producer client > 2017-05-05 21:23:03.415 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > Creating consumer client > 2017-05-05 21:23:03.520 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > Creating restore consumer client > 2017-05-05 21:23:03.528 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > State transition from NOT_RUNNING to RUNNING. > 2017-05-05 21:23:03.531 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] > Creating producer client > 2017-05-05 21:23:03.564 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] > Creating consumer client > 2017-05-05 21:23:03.569 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] > Creating restore consumer client > 2017-05-05 21:23:03.615 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] > State transition from NOT_RUNNING to RUNNING. > 2017-05-05 21:23:03.617 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] > Creating producer client > 2017-05-05 21:23:03.621 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] > Creating consumer client > 2017-05-05 21:23:03.625 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] > Creating restore consumer client > 2017-05-05 21:23:03.628 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] > State transition from NOT_RUNNING to RUNNING. > 2017-05-05 21:23:03.629 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Creating producer client > 2017-05-05 21:23:03.632 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Creating consumer client > 2017-05-05 21:23:03.635 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Creating restore consumer client > 2017-05-05 21:23:03.638 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > State transition from NOT_RUNNING to RUNNING. > 2017-05-05 21:23:03.639 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] > Creating producer client > 2017-05-05 21:23:03.641 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] > Creating consumer client > 2017-05-05 21:23:03.644 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] > Creating restore consumer client > 2017-05-05 21:23:03.647 INFO 71 --- [ main] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] > State transition from NOT_RUNNING to RUNNING. > 2017-05-05 21:23:03.790 INFO 71 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > Starting > 2017-05-05 21:23:03.791 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Starting > 2017-05-05 21:23:03.790 INFO 71 --- [ StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] > Starting > 2017-05-05 21:23:03.791 INFO 71 --- [ StreamThread-3] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] > Starting > 2017-05-05 21:23:03.792 INFO 71 --- [ StreamThread-5] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] > Starting > 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > at state RUNNING: partitions [] revoked at the beginning of consumer > rebalance. > 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] > at state RUNNING: partitions [] revoked at the beginning of consumer > rebalance. > 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > at state RUNNING: partitions [] revoked at the beginning of consumer > rebalance. > 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > State transition from RUNNING to PARTITIONS_REVOKED. > 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-3] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] > at state RUNNING: partitions [] revoked at the beginning of consumer > rebalance. > 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] > State transition from RUNNING to PARTITIONS_REVOKED. > 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-5] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] > at state RUNNING: partitions [] revoked at the beginning of consumer > rebalance. > 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > State transition from RUNNING to PARTITIONS_REVOKED. > 2017-05-05 21:23:03.968 INFO 71 --- [ StreamThread-3] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] > State transition from RUNNING to PARTITIONS_REVOKED. > 2017-05-05 21:23:03.968 INFO 71 --- [ StreamThread-5] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] > State transition from RUNNING to PARTITIONS_REVOKED. > 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] > Updating suspended tasks to contain active tasks [] > 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Updating suspended tasks to contain active tasks [] > 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-5] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] > Updating suspended tasks to contain active tasks [] > 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > Updating suspended tasks to contain active tasks [] > 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-3] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] > Updating suspended tasks to contain active tasks [] > 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] > Removing all active tasks [] > 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Removing all active tasks [] > 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-5] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] > Removing all active tasks [] > 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > Removing all active tasks [] > 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-3] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] > Removing all active tasks [] > 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] > Removing all standby tasks [] > 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Removing all standby tasks [] > 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-5] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] > Removing all standby tasks [] > 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > Removing all standby tasks [] > 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-3] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] > Removing all standby tasks [] > 2017-05-05 21:23:04.020 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] > Constructed client metadata > {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null, > consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks: ([]) > prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} > from the member subscriptions. > 2017-05-05 21:23:04.218 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] > Completed validating internal topics in partition assignor > 2017-05-05 21:23:04.591 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] > Completed validating internal topics in partition assignor > 2017-05-05 21:23:04.726 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] > Assigned tasks to clients as > {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>]) > assignedTasks: ([<list>]) prevActiveTasks: ([]) prevAssignedTasks: ([]) > capacity: 1.0 cost: 100.0]}. > 2017-05-05 21:23:04.742 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] > Constructed client metadata > {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null, > consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks: ([]) > prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: 0.0]}} > from the member subscriptions. > 2017-05-05 21:23:05.120 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] > Completed validating internal topics in partition assignor > 2017-05-05 21:23:05.482 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] > Completed validating internal topics in partition assignor > 2017-05-05 21:23:05.520 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] > Assigned tasks to clients as > {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>]) > prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: 50.0], > da663a61-dada-478b-b060-78d77536530a=[activeTasks: ([<list>]) > prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: 50.0]}. > 2017-05-05 21:23:05.553 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > at state PARTITIONS_REVOKED: new partitions [<partitionlist>] assigned at > the end of consumer rebalance. > *// The above line is repeated for each thread* > 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. > 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-5] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] > State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. > 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] > State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. > 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. > 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-3] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] > State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. > *// omitted* > 2017-05-05 21:23:15.596 INFO 71 --- [ StreamThread-2] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] > State transition from ASSIGNING_PARTITIONS to RUNNING. > *// above message repeated for each thread* > > *Important*: At this point only StreamThread-4 is performing commits > every 10 seconds. The other threads output no logs. Now the fun begins > > 2017-05-05 21:29:21.310 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > State transition from RUNNING to PARTITIONS_REVOKED. > 2017-05-05 21:29:21.310 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Closing task's topology ... // repeated multiple times > 2017-05-05 21:29:21.387 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Flushing state stores of task ... // repeated multiple times > 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Committing consumer offsets of task ... // repeated multiple times > 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Updating suspended tasks to contain active tasks [<list>] > 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Removing all active tasks [<list>] > 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] > Removing all standby tasks [] > > > At this point there are no more log messages for 16 minutes!! During this > time I perform several threaddumps, almost every minute. > Thread dump below. Do notice that thread 4 is the only different one. > > "StreamThread-5" - Thread t@57 > java.lang.Thread.State: BLOCKED > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) > - waiting to lock <653f9d6> (a > org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49 > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) > at > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) > at > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) > at > org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > - locked <3fbbc5a8> (a java.util.PriorityQueue) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > "StreamThread-4" - Thread t@55 > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <5709c085> (a sun.nio.ch.Util$2) > - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet) > - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at > org.apache.kafka.common.network.Selector.select(Selector.java:489) > at org.apache.kafka.common.network.Selector.poll(Selector.java:298) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > - locked <639d53ee> (a > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > "StreamThread-3" - Thread t@53 > java.lang.Thread.State: RUNNABLE > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) > - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor) > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) > at > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) > at > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) > at > org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > - locked <7a09baf0> (a java.util.PriorityQueue) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > "StreamThread-2" - Thread t@51 > java.lang.Thread.State: BLOCKED > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) > - waiting to lock <653f9d6> (a > org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49 > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) > at > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) > at > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) > at > org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > - locked <2dc188ac> (a java.util.PriorityQueue) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > "StreamThread-1" - Thread t@49 > java.lang.Thread.State: RUNNABLE > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:172) > - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor) > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) > at > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) > at > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) > at > org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > - locked <7dffc3aa> (a java.util.PriorityQueue) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > > (no logs omitted, ~16 minutes later) > 2017-05-05 21:45:05.270 INFO 71 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > Committing all tasks because the commit interval 10000ms has elapsed > 2017-05-05 21:45:05.270 INFO 71 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > Committing task StreamTask .. // repeated multiple times > 2017-05-05 21:45:05.379 INFO 71 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > State transition from RUNNING to PARTITIONS_REVOKED. > *// The above is repeated for threads 2, 3 and 5* > (no logs omitted!! This is really the next entry, ~10 minutes later) > 2017-05-05 21:55:16.835 INFO 71 --- [ StreamThread-4] > o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] > Constructed client metadata ... > > During the above 10 minutes all threads show the following > "StreamThread-5" - Thread t@57 > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <4a60e0dd> (a sun.nio.ch.Util$2) > - locked <5e54059f> (a java.util.Collections$UnmodifiableSet) > - locked <60693986> (a sun.nio.ch.EPollSelectorImpl) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at > org.apache.kafka.common.network.Selector.select(Selector.java:489) > at org.apache.kafka.common.network.Selector.poll(Selector.java:298) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > - locked <5f1ae6c1> (a > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > "StreamThread-4" - Thread t@55 > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <5709c085> (a sun.nio.ch.Util$2) > - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet) > - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at > org.apache.kafka.common.network.Selector.select(Selector.java:489) > at org.apache.kafka.common.network.Selector.poll(Selector.java:298) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > - locked <639d53ee> (a > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > "StreamThread-3" - Thread t@53 > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <123193f7> (a sun.nio.ch.Util$2) > - locked <6c3704d3> (a java.util.Collections$UnmodifiableSet) > - locked <45bbb5da> (a sun.nio.ch.EPollSelectorImpl) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at > org.apache.kafka.common.network.Selector.select(Selector.java:489) > at org.apache.kafka.common.network.Selector.poll(Selector.java:298) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > - locked <4d9f6f42> (a > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > "StreamThread-2" - Thread t@51 > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <532ff32d> (a sun.nio.ch.Util$2) > - locked <76a6407> (a java.util.Collections$UnmodifiableSet) > - locked <1f670455> (a sun.nio.ch.EPollSelectorImpl) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at > org.apache.kafka.common.network.Selector.select(Selector.java:489) > at org.apache.kafka.common.network.Selector.poll(Selector.java:298) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > - locked <29b48d84> (a > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > "StreamThread-1" - Thread t@49 > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <5aeb504> (a sun.nio.ch.Util$2) > - locked <5130a3ea> (a java.util.Collections$UnmodifiableSet) > - locked <76d25035> (a sun.nio.ch.EPollSelectorImpl) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at > org.apache.kafka.common.network.Selector.select(Selector.java:489) > at org.apache.kafka.common.network.Selector.poll(Selector.java:298) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > - locked <7b072bc6> (a > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > Eventually they get assigned partitions again, then they are revoked, > another long time passes, threads 1, 2, 3 and 5 stuck on Sensor and we get > into the same situation. > > Finally, I tried starting up only 1 instance (with 5 threads). Current > status: > > "StreamThread-5" - Thread t@56 > java.lang.Thread.State: RUNNABLE > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) > - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor) > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) > at > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) > at > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) > at > org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > - locked <5bea9b1b> (a java.util.PriorityQueue) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > "StreamThread-4" - Thread t@54 > java.lang.Thread.State: RUNNABLE > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) > - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor) > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) > at > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) > at > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) > at > org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > - locked <5a06855> (a java.util.PriorityQueue) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > "StreamThread-3" - Thread t@52 > java.lang.Thread.State: BLOCKED > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) > - waiting to lock <6a55b7c6> (a > org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56 > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) > at > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) > at > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) > at > org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > - locked <d3a57bc> (a java.util.PriorityQueue) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > "StreamThread-2" - Thread t@50 > java.lang.Thread.State: RUNNABLE > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:175) > at > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) > at > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) > at > org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > - locked <480f4efc> (a java.util.PriorityQueue) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > "StreamThread-1" - Thread t@48 > java.lang.Thread.State: BLOCKED > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) > - waiting to lock <6a55b7c6> (a > org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56 > at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) > at > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) > at > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) > at > org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > - locked <47b226a5> (a java.util.PriorityQueue) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > Locked ownable synchronizers: > - None > > This has been going on for over 40 minutes now and the cluster does not > stabilize. Not sure what to do here, any help welcome. > >