Chris Curtin created KAFKA-783: ---------------------------------- Summary: Preferred replica assignment on leader failure may not be correct Key: KAFKA-783 URL: https://issues.apache.org/jira/browse/KAFKA-783 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8 Environment: $ uname -a Linux vrd01.atlnp1 2.6.32-279.el6.x86_64 #1 SMP Fri Jun 22 12:19:21 UTC 2012 x86_64 x86_64 x86_64 GNU/Linux
$ java -version java version "1.6.0_25" Java(TM) SE Runtime Environment (build 1.6.0_25-b06) Java HotSpot(TM) 64-Bit Server VM (build 20.0-b11, mixed mode) Kafka 0.8.0 loaded from HEAD on 1/29/2013 Reporter: Chris Curtin Assignee: Neha Narkhede Based on an email thread in the user group, Neha asked me to submit this. Original question: "> I ran another test, again starting with a full cluster and all partitions > had a full set of copies. When I stop the broker which was leader for 9 of > the 10 partitions, the leaders were all elected on one machine instead of > the set of 3. Should the leaders have been better spread out? Also the > copies weren’t fully populated either." Neha: "For problem 2, we always try to make the preferred replica (1st replica in the list of all replicas for a partition) the leader, if it is available. We intended to spread the preferred replica for all partitions for a topic evenly across the brokers. If this is not happening, we need to look into it. Please can you file a bug and describe your test case there ?" Configuration: 4 node cluster 1 topic with 3 replicas 10 partitions: 0-9 below Current status: Partition: 0:vrd01.atlnp1 R:[ vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] Partition: 1:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Partition: 2:vrd01.atlnp1 R:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd02.atlnp1] Partition: 3:vrd03.atlnp1 R:[ vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Partition: 4:vrd01.atlnp1 R:[ vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd02.atlnp1] Partition: 5:vrd03.atlnp1 R:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Partition: 6:vrd01.atlnp1 R:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] Partition: 7:vrd01.atlnp1 R:[ vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Partition: 8:vrd03.atlnp1 R:[ vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Partition: 9:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd03.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] Shutdown vrd03: Partition: 0:vrd01.atlnp1 R:[ ] I:[] Partition: 1:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Partition: 2:vrd01.atlnp1 R:[ ] I:[] *Partition: 3:vrd04.atlnp1 R:[ ] I:[] Partition: 4:vrd01.atlnp1 R:[ ] I:[] *Partition: 5:vrd04.atlnp1 R:[ ] I:[] Partition: 6:vrd01.atlnp1 R:[ ] I:[] Partition: 7:vrd01.atlnp1 R:[ vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1] *Partition: 8:vrd04.atlnp1 R:[ ] I:[] Partition: 9:vrd01.atlnp1 R:[ ] I:[] (* means leader changed) Note that partitions 3, 5 and 8 were assigned new leaders. Per an email group thread with Neha, the new leader should be assigned from the preferred replica. So 3 should have gotten vrd02, 5, vrd04 and 8 vrd02 (since 03 was shutdown). Instead 3 got vrd04, 5 got vrd04 and 8 got vrd04. Restarting vrd03 led to: Partition: 0:vrd01.atlnp1 R:[ vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd03.atlnp1] Partition: 1:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Partition: 2:vrd01.atlnp1 R:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1] Partition: 3:vrd04.atlnp1 R:[ vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] Partition: 4:vrd01.atlnp1 R:[ vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1] Partition: 5:vrd04.atlnp1 R:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] Partition: 6:vrd01.atlnp1 R:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd03.atlnp1] Partition: 7:vrd01.atlnp1 R:[ vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Partition: 8:vrd04.atlnp1 R:[ vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] Partition: 9:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd03.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd03.atlnp1] Stopping vrd01 now led to: *Partition: 0:vrd04.atlnp1 R:[ ] I:[] *Partition: 1:vrd04.atlnp1 R:[ ] I:[] *Partition: 2:vrd02.atlnp1 R:[ ] I:[] Partition: 3:vrd04.atlnp1 R:[ vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] *Partition: 4:vrd02.atlnp1 R:[ ] I:[] Partition: 5:vrd04.atlnp1 R:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] *Partition: 6:vrd04.atlnp1 R:[ ] I:[] *Partition: 7:vrd04.atlnp1 R:[ ] I:[] Partition: 8:vrd04.atlnp1 R:[ vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] *Partition: 9:vrd04.atlnp1 R:[ ] I:[] (* means leader changed) So 0, 2, 4, 6 and 7 were assigned the wrong leader (If preferred was first in the list. If last in list 1 & 2 are wrong) Java code: kafka.javaapi.consumer.SimpleConsumer consumer = new SimpleConsumer("vrd04.atlnp1", 9092, 100000, 64 * 1024, "test"); List<String> topics2 = new ArrayList<String>(); topics2.add("storm-anon"); TopicMetadataRequest req = new TopicMetadataRequest(topics2); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<kafka.javaapi.TopicMetadata> data3 = resp.topicsMetadata(); for (kafka.javaapi.TopicMetadata item : data3) { for (kafka.javaapi.PartitionMetadata part: item.partitionsMetadata() ) { String replicas = ""; String isr = ""; for (kafka.cluster.Broker replica: part.replicas() ) { replicas += " " + replica.host(); } for (kafka.cluster.Broker replica: part.isr() ) { isr += " " + replica.host(); } System.out.println( "Partition: " + part.partitionId() + ":" + part.leader().host() + " R:[ " + replicas + "] I:[" + isr + "]"); } } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira