Imran Patel created KAFKA-3040:
----------------------------------

             Summary: Broker didn't report new data after change in leader
                 Key: KAFKA-3040
                 URL: https://issues.apache.org/jira/browse/KAFKA-3040
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 0.9.0.0
         Environment: Debian 3.2.54-2 x86_64 GNU/Linux
            Reporter: Imran Patel
            Priority: Critical


Recently we had an event that causes large Kafka backlogs to develop suddenty. 
This happened across multiple partitions. We noticed that after a brief 
connection loss to Zookeeper, Kafka brokers were not reporting no new data to 
our (SimpleConsumer) consumer although the producers were enqueueing fine. This 
went on until another zk blip led to a reconfiguration which suddenly caused 
the consumers to "see" the data. Our consumers and our monitoring tools did not 
see the offsets move during the outage window. Here is the sequence of events 
for a single partition (with logs attached below). 

The brokers are running 0.9, the producer is using library version 
kafka_2.10:0.8.2.1 and consumer is using kafka_2.10:0.8.0 (both are Java 
programs). Our monitoring tool uses kafka-python-9.0

Can you tell us if this could be due to a consumer bug (the libraries being too 
"old" to operate with 0.9 broker, for e.g.)? Or does it look a Kafka core 
issue? Please note that we recently upgraded the brokers to 0.9 and hadn't seen 
a similar issue prior to that.

- after a brief connection loss to zookeeper, the partition leader (broker 9 
for partition 29 in logs below) came back and shrunk the ISR to itself. 
- producers kept on successfully sending data to Kafka and the remaining 
replicas (brokers 3 and 4) recorded this data. AFAICT, 3 was the new leader. 
Broker 9 did NOT replicate this data. It did repeatedly print the ISR shrinking 
message over and over again.
- consumer on the other hand reported no new data presumably because it was 
talking to 9 and that broker was doing nothing.
- 6 hours later, another zookeeper blip causes the brokers to reconfigure and 
now consumers started seeing new data. 

Broker 9:
[2015-12-16 19:46:01,523] INFO Partition [messages,29] on broker 9: Expanding 
ISR for partition [messages,29] from 9,4 to 9,4,3 (kafka.cluster.Partition
[2015-12-18 00:59:25,511] INFO New leader is 9 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-12-18 01:00:18,451] INFO Partition [messages,29] on broker 9: Shrinking 
ISR for partition [messages,29] from 9,4,3 to 9 (kafka.cluster.Partition)
[2015-12-18 01:00:18,458] INFO Partition [messages,29] on broker 9: Cached 
zkVersion [472] not equal to that in zookeeper, skip updating ISR 
(kafka.cluster.Partition)
[2015-12-18 07:04:44,552] INFO Truncating log messages-29 to offset 
14169556269. (kafka.log.Log)
[2015-12-18 07:04:44,649] INFO [ReplicaFetcherManager on broker 9] Added 
fetcher for partitions List([[messages,61], initOffset 14178575900 to broker 
BrokerEndPoint(6,kafka006-prod.c.foo.internal,9092)] , [[messages,13], 
initOffset 14156091271 to broker 
BrokerEndPoint(2,kafka002-prod.c.foo.internal,9092)] , [[messages,45], 
initOffset 14135826155 to broker 
BrokerEndPoint(4,kafka004-prod.c.foo.internal,9092)] , [[messages,41], 
initOffset 14157926400 to broker 
BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] , [[messages,29], 
initOffset 14169556269 to broker 
BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] , [[messages,57], 
initOffset 14175218230 to broker 
BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] ) 
(kafka.server.ReplicaFetcherManager)


Broker 3:
[2015-12-18 01:00:01,763] INFO [ReplicaFetcherManager on broker 3] Removed 
fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
[2015-12-18 07:09:04,631] INFO Partition [messages,29] on broker 3: Expanding 
ISR for partition [messages,29] from 4,3 to 4,3,9 (kafka.cluster.Partition)
[2015-12-18 07:09:49,693] INFO [ReplicaFetcherManager on broker 3] Removed 
fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)


Broker 4:
[2015-12-18 01:00:01,783] INFO [ReplicaFetcherManager on broker 4] Removed 
fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
[2015-12-18 01:00:01,866] INFO [ReplicaFetcherManager on broker 4] Added 
fetcher for partitions List([[messages,29], initOffset 14169556262 to broker 
BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] ) 
(kafka.server.ReplicaFetcherManager)
[2015-12-18 07:09:50,191] ERROR [ReplicaFetcherThread-0-3], Error for partition 
[messages,29] to broker 
3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)

Consumer:
2015-12-18 01:00:01.791 [P29-Reader] INFO  
com.example.utils.kafkalib.KafkaConsumer - 7 messages read from partition 29 
starting from offset 14169556262
2015-12-18 01:00:01.791 [P29-Reader] INFO  
com.example.utils.kafkalib.KafkaConsumer - 0 messages read from partition 29 
starting from offset 14169556269
2015-12-18 07:04:44.293 [P29-Reader] INFO  
com.example.utils.kafkalib.KafkaConsumer - 0 messages read from partition 29 
starting from offset 14169556269
2015-12-18 07:04:44.303 [P29-Reader] WARN  
com.example.project.consumer.kafka.PartitionReader - Error fetching data from 
the Broker:kafka009-prod.c.foo.internal Reason: NotLeaderForPartitionCode
2015-12-18 07:04:44.304 [P29-Reader] INFO  
com.example.project.consumer.kafka.PartitionData - Attempting to connectAndRead 
leader for topic: messages partition: 29
2015-12-18 07:04:44.309 [P29-Reader] INFO  
com.example.project.consumer.kafka.PartitionData - Leader for topic: messages 
partition: 29 set to kafka003-prod.c.foo.internal
2015-12-18 07:04:44.749 [P29-Reader] INFO  
com.example.utils.kafkalib.KafkaConsumer - 6514 messages read from partition 29 
starting from offset 14169556269




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to