[ 
https://issues.apache.org/jira/browse/KAFKA-6799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443921#comment-16443921
 ] 

Attila Sasvari edited comment on KAFKA-6799 at 4/19/18 3:00 PM:
----------------------------------------------------------------

[~phdezann] thanks for reporting this issue and creating the docker environment 
for reproducing it.

- I had to set the environment variable M2_REPOSITORY before running the shell 
script: {{M2_REPOSITORY=/root/.m2/ ./spin.sh}}
Then I saw the issue you described in the description.
- I looked a bit around in the helloworld-kafka-1 docker container, and noticed 
that the replication factor for the internal topic was set to 1:
{code}
root@b6e9218f1761:/install/kafka# bin/kafka-topics.sh --describe -zookeeper 
172.170.0.80:2181
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:1     
Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
        Topic: __consumer_offsets       Partition: 0    Leader: 103     
Replicas: 103   Isr: 103
        Topic: __consumer_offsets       Partition: 1    Leader: 101     
Replicas: 101   Isr: 101
        Topic: __consumer_offsets       Partition: 2    Leader: -1      
Replicas: 102   Isr: 102
{code}
In this situation, consumer cannot contact the partition leader for  
__consumer_offsets, Partition: 2 as it was killed by the test. So it won't be 
able to commit the offset, for that specific partition.
- I changed replication factor to 3 for {{__consumer_offsets}} and then I did 
not see this issue.
- Can you add the following to {{docker/entrypoint.sh}} and re-test?
{code}
cat >>config/server.properties <<ENDL


offsets.topic.replication.factor=3
default.replication.factor=3
min.insync.replicas=2
ENDL
{code}
before 
{code}
# Start kafka
 ./bin/kafka-server-start.sh config/server.properties --override 
advertised.listeners=${KAFKA_ADVERTISED_LISTENERS} --override 
broker.id=${KAFKA_BROKER_ID} --override 
zookeeper.connect=${KAFKA_ZOOKEEPER_CONNECT}
{code}


was (Author: asasvari):
[~phdezann] thanks for reporting this issue and creating the docker environment 
for reproducing the issue.

- I had to set the environment variable M2_REPOSITORY before running the shell 
script: {{M2_REPOSITORY=/root/.m2/ ./spin.sh}}
Then I saw the issue you described in the description.
- I looked a bit around in the helloworld-kafka-1 docker container, and noticed 
that the replication factor for the internal topic was set to 1:
{code}
root@b6e9218f1761:/install/kafka# bin/kafka-topics.sh --describe -zookeeper 
172.170.0.80:2181
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:1     
Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
        Topic: __consumer_offsets       Partition: 0    Leader: 103     
Replicas: 103   Isr: 103
        Topic: __consumer_offsets       Partition: 1    Leader: 101     
Replicas: 101   Isr: 101
        Topic: __consumer_offsets       Partition: 2    Leader: -1      
Replicas: 102   Isr: 102
{code}
In this situation, consumer cannot contact the partition leader for  
__consumer_offsets, Partition: 2 as it was killed by the test. So it won't be 
able to commit the offset, for that specific partition.
- I changed replication factor to 3 for {{__consumer_offsets}} and then I did 
not see this issue.
- Can you add the following to {{docker/entrypoint.sh}} and re-test?
{code}
cat >>config/server.properties <<ENDL


offsets.topic.replication.factor=3
default.replication.factor=3
min.insync.replicas=2
ENDL
{code}
before 
{code}
# Start kafka
 ./bin/kafka-server-start.sh config/server.properties --override 
advertised.listeners=${KAFKA_ADVERTISED_LISTENERS} --override 
broker.id=${KAFKA_BROKER_ID} --override 
zookeeper.connect=${KAFKA_ZOOKEEPER_CONNECT}
{code}

> Consumer livelock during consumer group rebalance
> -------------------------------------------------
>
>                 Key: KAFKA-6799
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6799
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 1.0.0, 0.11.0.2, 1.1.0
>            Reporter: Pierre-Henri Dezanneau
>            Priority: Critical
>
> We have the following environment:
> * 1 kafka cluster with 3 brokers
> * 1 topic with 3 partitions
> * 1 producer
> * 1 consumer group with 3 consumers
> From this setup, we remove one broker from the cluster, the hard way, by 
> simply killing it. Quite often, we see that the consumer group is not 
> rebalanced correctly. By that I mean that all 3 consumers stop consuming and 
> get stuck in a loop, forever.
> The thread dump shows that the consumer threads aren't blocked but run 
> forever in {{AbstractCoordinator.ensureCoordinatorReady}}, holding a lock due 
> to the {{synchonized}} keyword on the calling method. Heartbeat threads are 
> blocked, waiting for the consumer threads to release the lock. This situation 
> prevents all consumers from consuming any more record.
> We build a simple project which seems to reliably demonstrate this:
> {code:sh}
> $ mkdir -p /tmp/sandbox && cd /tmp/sandbox
> $ git clone https://github.com/phdezann/helloworld-kafka-livelock
> $ cd helloworld-kafka-livelock && ./spin.sh
> ...
> livelock detected
> {code}
> {code:sh|title=Consumer thread|borderStyle=solid}
> "kafka-consumer-1@10733" daemon prio=5 tid=0x31 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>        blocks kafka-coordinator-heartbeat-thread | helloWorldGroup@10728
>         at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
>         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>         at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>         - locked <0x2a15> (a sun.nio.ch.EPollSelectorImpl)
>         - locked <0x2a16> (a java.util.Collections$UnmodifiableSet)
>         - locked <0x2a17> (a sun.nio.ch.Util$3)
>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>         at org.apache.kafka.common.network.Selector.select(Selector.java:684)
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:408)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:156)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
>         - locked <0x2a0c> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:279)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>         at 
> org.helloworld.kafka.bus.HelloWorldKafkaListener.lambda$createConsumerInDedicatedThread$0(HelloWorldKafkaListener.java:45)
>         at 
> org.helloworld.kafka.bus.HelloWorldKafkaListener$$Lambda$42.1776656466.run(Unknown
>  Source:-1)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> {code:sh|title=Heartbeat thread|borderStyle=solid}
> "kafka-coordinator-heartbeat-thread | helloWorldGroup@10728" daemon prio=5 
> tid=0x36 nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
>        waiting for kafka-consumer-1@10733 to release lock on <0x2a0c> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>         at java.lang.Object.wait(Object.java:-1)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:955)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to