Nelson Elhage created KAFKA-4358:
------------------------------------
Summary: Following a hung broker, newly elected leader is
unnecessarily slow assuming leadership because of ReplicaFetcherThread
Key: KAFKA-4358
URL: https://issues.apache.org/jira/browse/KAFKA-4358
Project: Kafka
Issue Type: Bug
Components: replication
Affects Versions: 0.10.0.1
Reporter: Nelson Elhage
Priority: Minor
When a broker handles a `LeaderAndIsr` request, the replica manager blocks
waiting for idle replication fetcher threads to die before responding to the
message and being able to service new produce requests.
If requests to a broker start blackholing (e.g. due to network failure, or due
to the broker hanging), shutting down the `ReplicaFetcherThread` can take a
long time (around 30s in my testing), blocking recovery of any partitions
previously lead by that broker.
This is a very similar issue to KAFKA-612.
Instructions to reproduce/demonstrate:
Stand up three brokers and create a replicated topic:
{code}
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &
bin/kafka-server-start.sh config/server3.properties &
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
--partitions 1 --topic replicated.topic
{code}
Identify the leader, and (for simplicity in interpreting the event) make sure
it's not the same as the cluster controller:
{code}
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic
replicated.topic
{code}
Start a stream of produce events (with a shortened timeout so we get faster
visibility into when the cluster recovers):
{code}
echo request.timeout.ms=1000 >> config/producer.properties
bin/kafka-verifiable-producer.sh --throughput 2 --topic replicated.topic
--broker-list localhost:9092 --producer.config $(pwd)/config/producer.properties
{code}
Now SIGSTOP the leader (3, in my example):
{code}
kill -STOP $(pgrep -f server3.properties)
{code}
The producer will log errors for about 30 seconds, and then recover. However,
if we read logs, we'll see (excerpting key log lines from `state-change.log`):
{code}
[2016-10-28 20:36:03,128] TRACE Controller 2 epoch 8 sending become-leader
LeaderAndIsr request (Leader:2,ISR:2,1,LeaderEpoch:22,ControllerEpoch:8) to
broker 2 for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:36:03,131] TRACE Broker 2 handling LeaderAndIsr request
correlationId 2 from controller 2 epoch 8 starting the become-leader transition
for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:17,741] TRACE Controller 1 epoch 11 changed partition
[replicated.topic,0] from OnlinePartition to OnlinePartition with leader 3
(state.change.logger)
[2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 changed partition
[replicated.topic,0] state from OnlinePartition to OfflinePartition
(state.change.logger)
[2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 started leader election
for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,016] TRACE Controller 1 epoch 11 elected leader 2 for
Offline partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 changed partition
[replicated.topic,0] from OfflinePartition to OnlinePartition with leader 2
(state.change.logger)
[2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 sending become-leader
LeaderAndIsr request (Leader:2,ISR:1,2,LeaderEpoch:30,ControllerEpoch:11) to
broker 2 for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,023] TRACE Broker 2 received LeaderAndIsr request
PartitionState(controllerEpoch=11, leader=2, leaderEpoch=30, isr=[1, 2],
zkVersion=46, replicas=[1, 2, 3]) correlation id 18 from controller 1 epoch 11
for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,024] TRACE Broker 2 handling LeaderAndIsr request
correlationId 18 from controller 1 epoch 11 starting the become-leader
transition for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,026] TRACE Broker 2 stopped fetchers as part of
become-leader request from controller 1 epoch 11 with correlation id 18 for
partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,026] TRACE Broker 2 completed LeaderAndIsr request
correlationId 18 from controller 1 epoch 11 for the become-leader transition
for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:56,058] TRACE Controller 1 epoch 11 received response
{error_code=0,partitions=[{topic=replicated.topic,partition=0,error_code=0}]}
for a request sent to broker qa-dev1.northwest.stripe.io:9093 (id: 2 rack:
null) (state.change.logger)
{code}
Note the ~23s pause between broker 2 logging completion of the LeaderAndIsr
request, and the controller logging receipt. If we look at broker 2
specifically now, we find
{code}
[2016-10-28 20:48:33,025] INFO [ReplicaFetcherManager on broker 2] Removed
fetcher for partitions replicated.topic-0 (kafka.server.ReplicaFetcherManager)
[2016-10-28 20:48:33,026] INFO [ReplicaFetcherThread-0-3], Shutting down
(kafka.server.ReplicaFetcherThread)
[2016-10-28 20:48:56,055] INFO [ReplicaFetcherThread-0-3], Stopped
(kafka.server.ReplicaFetcherThread)
[2016-10-28 20:48:56,055] INFO [ReplicaFetcherThread-0-3], Shutdown completed
(kafka.server.ReplicaFetcherThread)
{code}
Which timestamps exactly match the "missing" seconds. It also aligns with the
first successful publish from my publish job, which happened at `2016-10-28
20:48:57.555`
Aggressively dropping `request.timeout.ms` speeds up this failover process, but
it seems that we should be able to recover those ~25s of unavailability without
having to drop any additional timeouts, by either interrupting that thread
somehow or not having to block on its shutdown.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)