Just to confirm, are you running version 0.8.1.1? Guozhang
On Mon, Jul 28, 2014 at 4:23 PM, Jad Naous <jad.na...@appdynamics.com> wrote: > Done! Thanks! > > > On Mon, Jul 28, 2014 at 4:16 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Thanks Jad, > > > > The mailing list may blocked the attachments. I have file a JIRA for your > > issue, could you upload the logs there? > > > > https://issues.apache.org/jira/browse/KAFKA-1561 > > > > Guozhang > > > > > > On Mon, Jul 28, 2014 at 1:55 PM, Jad Naous <jad.na...@appdynamics.com> > > wrote: > > > > > The logs are quite large. I've sifted through them, and I'm attaching > the > > > logs for the relevant parts where the lost message goes through the > > system. > > > Here's what the test does: > > > > > > 0) Start two brokers, one producer, one consumer. Topic has 20 > > partitions, > > > using default partitioning scheme (which seems to send data to only a > > > couple of partitions when the keys are null, but that doesn't matter > for > > > this test). > > > 1) Start a data generator sending data through Kafka continuously > > > 2) Start a new broker > > > 3) Reassign partitions: {"version": 1, "partitions":[ > > > {"topic":"EventServiceUpsertTopic","partition":0, > "replicas": > > > [0, 1, 2]}, > > > {"topic":"EventServiceUpsertTopic","partition":1, > "replicas": > > > [1, 2, 0]}, > > > {"topic":"EventServiceUpsertTopic","partition":2, > "replicas": > > > [2, 0, 1]}, > > > {"topic":"EventServiceUpsertTopic","partition":3, > "replicas": > > > [0, 1, 2]}, > > > {"topic":"EventServiceUpsertTopic","partition":4, > "replicas": > > > [1, 2, 0]}, > > > {"topic":"EventServiceUpsertTopic","partition":5, > "replicas": > > > [2, 0, 1]}, > > > {"topic":"EventServiceUpsertTopic","partition":6, > "replicas": > > > [0, 1, 2]}, > > > {"topic":"EventServiceUpsertTopic","partition":7, > "replicas": > > > [1, 2, 0]}, > > > {"topic":"EventServiceUpsertTopic","partition":8, > "replicas": > > > [2, 0, 1]}, > > > {"topic":"EventServiceUpsertTopic","partition":9, > "replicas": > > > [0, 1, 2]}, > > > {"topic":"EventServiceUpsertTopic","partition":10, > "replicas": > > > [1, 2, 0]}, > > > {"topic":"EventServiceUpsertTopic","partition":11, > "replicas": > > > [2, 0, 1]}, > > > {"topic":"EventServiceUpsertTopic","partition":12, > "replicas": > > > [0, 1, 2]}, > > > {"topic":"EventServiceUpsertTopic","partition":13, > "replicas": > > > [1, 2, 0]}, > > > {"topic":"EventServiceUpsertTopic","partition":14, > "replicas": > > > [2, 0, 1]}, > > > {"topic":"EventServiceUpsertTopic","partition":15, > "replicas": > > > [0, 1, 2]}, > > > {"topic":"EventServiceUpsertTopic","partition":16, > "replicas": > > > [1, 2, 0]}, > > > {"topic":"EventServiceUpsertTopic","partition":17, > "replicas": > > > [2, 0, 1]}, > > > {"topic":"EventServiceUpsertTopic","partition":18, > "replicas": > > > [0, 1, 2]}, > > > {"topic":"EventServiceUpsertTopic","partition":19, > "replicas": > > > [1, 2, 0]}]} > > > 4) Wait until reassignment is complete (i.e. until > > > ZkUtils.getPartitionsBeingReassigned() returns empty map) > > > 5) Wait until all replicas are caught up (i.e. until > > > ZkUtils.getLeaderAndIsrForPartition() returns all brokers in the ISR > for > > > each partition) > > > 6) Trigger leader re-election by calling the > > > PreferredReplicaLeaderElectionCommand > > > 7) Wait until all the leaders are the preferred leaders for partitions > > > according to the replica reassignment from step 3 > > > 8) Stop the data generator > > > 9) Check that all the data was consumed > > > > > > You can seem from the producer.log that the data: {"field1": ["10"], > > > "idField": "id-5-59"} was sent to broker0 successfully, but the > consumer > > > never sees it. > > > > > > Thanks, > > > Jad. > > > > > > > > > > > > On Mon, Jul 28, 2014 at 10:33 AM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > >> Also, for the initial two replica case did you see any error/warning > > logs > > >> on your producers? > > >> > > >> Guozhang > > >> > > >> > > >> On Mon, Jul 28, 2014 at 10:32 AM, Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >> > > >> > Hi Jad, > > >> > > > >> > Just to clarify, you also see data loss when you created the topic > > with > > >> > replica factor 2, and two replicas running, and after an auto leader > > >> > election triggered? If that is the case could you attach the logs of > > all > > >> > involved brokers here? > > >> > > > >> > For your second question, KAFKA-1211 is designed to handle that > case. > > >> > > > >> > Guozhang > > >> > > > >> > > > >> > On Mon, Jul 28, 2014 at 10:18 AM, Jad Naous < > > jad.na...@appdynamics.com> > > >> > wrote: > > >> > > > >> >> Guozhang, > > >> >> > > >> >> I have actually also seen this happen when there are two replicas > > >> >> initially. So this problem is not limited to 1 replica. The issue > is > > >> the > > >> >> truncation after leader election, which will also happen on the > > second > > >> >> replica. > > >> >> > > >> >> Coming back to your objections: > > >> >> > > >> >> > > >> >> First case: inconsistency between replicas > > >> >> > 1) currently replica 1 is the current leader > > >> >> > replica 1: m1 m2 m3 > > >> >> > replica 2: m1 m2 > > >> >> > 2) replica 1 fails, and replica 2 becomes the new leader and > accept > > >> >> > messages 4 and 5: > > >> >> > replica 1: m1 m2 m3 > > >> >> > replica 2: m1 m2 m4 m5 > > >> >> > 3) replica 1 resumes, and does not truncate to HW, then it will > > still > > >> >> > maintain m3, which is actually never "committed". Say leader > moves > > to > > >> >> > replica 1 again, we can ended up with: > > >> >> > replica 1: m1 m2 m3 m6 > > >> >> > replica 2: m1 m2 m4 m5 > > >> >> > > >> >> > > >> >> I see. So when a replica resumes, it has to truncate to the last HW > > it > > >> saw > > >> >> before it died. > > >> >> > > >> >> > > >> >> > > > >> >> > Second case: inconsistency between server and clients: > > >> >> > 1) producer send message m3 with ack=-1: > > >> >> > replica 1: m1 m2 m3 > > >> >> > replica 2: m1 m2 m3 > > >> >> > replica 3: m1 m2 > > >> >> > 2) the response is held until all replicas also gets m3, say at > > this > > >> >> time > > >> >> > current leader replica 1 fails and replica 3 re-elects. If > replica > > 3 > > >> >> gets > > >> >> > up to the largest LEO it will also get m3. > > >> >> > replica 2: m1 m2 m3 > > >> >> > replica 3: m1 m2 m3 > > >> >> > 3) But m3 is not actually "committed" by the time replica 1 > fails; > > >> when > > >> >> > producer gets the error at the time replica 1 fails, it will > think > > >> that > > >> >> m3 > > >> >> > was not successfully sent, so retry sending m3: > > >> >> > replica 2: m1 m2 m3 m3 > > >> >> > replica 3: m1 m2 m3 m3 > > >> >> > > >> >> > > >> >> So on failure, all nodes need to truncate to the HW. But if there's > > no > > >> >> failure, then truncating would lose data unnecessarily. Maybe those > > two > > >> >> scenarios need to be handled differently? > > >> >> > > >> >> Jad. > > >> >> > > >> >> > > >> >> On Mon, Jul 28, 2014 at 9:58 AM, Guozhang Wang <wangg...@gmail.com > > > > >> >> wrote: > > >> >> > > >> >> > Jun, Jad, > > >> >> > > > >> >> > I think in this case data loss can still happen, since the > replica > > >> >> factor > > >> >> > was previously one, and in handling the produce requests, if the > > >> server > > >> >> > decides that all the produced partitions have a replica factor > of 1 > > >> it > > >> >> will > > >> >> > also directly send back the response instead of putting the > request > > >> into > > >> >> > purgatory even if currently the number of replicas is 2 (for > > details > > >> >> look > > >> >> > at ReplicaManager.getReplicationFactorForPartition and search of > > the > > >> >> usage > > >> >> > of Partition.replicationFactor). > > >> >> > > > >> >> > I now agree that this is not related to KAFKA-1211 but a > different > > >> small > > >> >> > bug. We need to probably file another JIRA for this. But I think > > >> after > > >> >> this > > >> >> > one is fixed (which should be much easier than KAFKA-1211), Jad's > > >> >> scenario > > >> >> > should not cause data loss anymore. > > >> >> > > > >> >> > Guozhang > > >> >> > > > >> >> > > > >> >> > On Sun, Jul 27, 2014 at 6:11 PM, Jad Naous < > > >> jad.na...@appdynamics.com> > > >> >> > wrote: > > >> >> > > > >> >> > > So in summary, is it true to say that currently triggering > leader > > >> >> > > reelection is not a safe operation? I have been able to > reproduce > > >> that > > >> >> > > message loss pretty reliably in tests. If that is the case, > isn't > > >> >> that an > > >> >> > > important operation in a large cluster where nodes go up and > > down? > > >> >> > > On Jul 25, 2014 10:00 PM, "Jun Rao" <jun...@gmail.com> wrote: > > >> >> > > > > >> >> > > > Actually, I don't think KAFKA-1211 will happen with just 2 > > >> replicas. > > >> >> > > When a > > >> >> > > > replica becomes a leader, it never truncates its log. Only > > when a > > >> >> > replica > > >> >> > > > becomes follower, it truncates its log to HW. So in this > > >> particular > > >> >> > case, > > >> >> > > > the new leader will not truncate data to offset 8. > > >> >> > > > > > >> >> > > > Thanks, > > >> >> > > > > > >> >> > > > Jun > > >> >> > > > > > >> >> > > > > > >> >> > > > On Fri, Jul 25, 2014 at 3:37 PM, Guozhang Wang < > > >> wangg...@gmail.com> > > >> >> > > wrote: > > >> >> > > > > > >> >> > > > > Hi Jad, > > >> >> > > > > > > >> >> > > > > Yes. In this case I think you are actually hitting > > KAFKA-1211. > > >> The > > >> >> > > > summary > > >> >> > > > > of the issue is that, it takes one more fetch request round > > >> trip > > >> >> for > > >> >> > > the > > >> >> > > > > follower replica to advance the HW after the leader has > > >> advanced > > >> >> HW. > > >> >> > So > > >> >> > > > for > > >> >> > > > > your case, the whole process is like this: > > >> >> > > > > > > >> >> > > > > 1. leader LEO at 10, follower LEO at 8. Both leader and > > >> follower > > >> >> > knows > > >> >> > > > the > > >> >> > > > > LEO is at 8. > > >> >> > > > > 2. Follower fetch data on Leader starting at 8, leader > > records > > >> its > > >> >> > LEO > > >> >> > > as > > >> >> > > > > 8. > > >> >> > > > > 3. Follower gets 9 and 10 and append to its local log. > > >> >> > > > > 4. Follower fetch data on Leader starting at 10, leader > > records > > >> >> its > > >> >> > LEO > > >> >> > > > as > > >> >> > > > > 10; now leader knows follower has caught up, it advances > its > > HW > > >> >> to 10 > > >> >> > > and > > >> >> > > > > adds the follower to ISR (but follower does not know that > > yet! > > >> It > > >> >> > still > > >> >> > > > > think the HW is 8). > > >> >> > > > > 5. Leader's fetch response gets back to follower, and now > the > > >> >> > follower > > >> >> > > > > knows that HW has been updated to 10. > > >> >> > > > > > > >> >> > > > > And let's say there is a leader election between step 4) > and > > >> 5), > > >> >> for > > >> >> > > your > > >> >> > > > > case it is due to preferred leader election, but it could > > also > > >> be > > >> >> > that > > >> >> > > > > current leader fails, etc. Then on becoming the new leader > > the > > >> >> > follower > > >> >> > > > > will truncate its data to 8, which is the HW it knows. > Hence > > >> the > > >> >> data > > >> >> > > > loss. > > >> >> > > > > > > >> >> > > > > The proposed solution in KAFKA-1211 will tackle this issue. > > >> >> > > > > > > >> >> > > > > Guozhang > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > On Fri, Jul 25, 2014 at 2:48 PM, Jad Naous < > > >> >> > jad.na...@appdynamics.com> > > >> >> > > > > wrote: > > >> >> > > > > > > >> >> > > > > > Hi Guozhang, > > >> >> > > > > > > > >> >> > > > > > Yes, broker 1 is in the ISR (in fact, I wait for broker 1 > > to > > >> be > > >> >> in > > >> >> > > the > > >> >> > > > > ISR > > >> >> > > > > > before triggering election). However, I think there is > > >> something > > >> >> > > still > > >> >> > > > > > amiss. I still see data loss. Here are some relevant log > > >> lines > > >> >> from > > >> >> > > > > broker > > >> >> > > > > > 0 (different test run). The log on broker 0 is getting > > >> >> truncated, > > >> >> > > > losing > > >> >> > > > > > some messages. > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,134] [DEBUG] > > [kafka-request-handler-5] > > >> >> > > > > > [kafka.cluster.Partition] Partition > > >> >> [EventServiceUpsertTopic,19] > > >> >> > on > > >> >> > > > > broker > > >> >> > > > > > 0: Old hw for partition [EventServiceUpsertTopic,19] is > > 8082. > > >> >> New > > >> >> > hw > > >> >> > > is > > >> >> > > > > > 8082. All leo's are 8111,8082 > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,134] [DEBUG] > > [kafka-request-handler-5] > > >> >> > > > > > [kafka.server.KafkaApis] [KafkaApi-0] Produce to local > log > > >> in > > >> >> 1 ms > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,134] [DEBUG] > > >> >> > > [main-SendThread(localhost:49893)] > > >> >> > > > > > [org.apache.zookeeper.ClientCnxn] Reading reply > > >> >> > > > > > sessionid:0x476e9a9e9a0001, packet:: clientPath:null > > >> >> > serverPath:null > > >> >> > > > > > finished:false header:: 729,5 replyHeader:: > > 729,4294968217,0 > > >> >> > > > request:: > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > > > '/brokers/topics/EventServiceUpsertTopic/partitions/5/state,#7b22636f6e74726f6c6c65725f65706f6368223a312c226c6561646572223a312c2276657273696f6e223a312c226c65616465725f65706f6368223a332c22697372223a5b302c315d7d,3 > > >> >> > > > > > response:: > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > > > s{4294967416,4294968217,1406309966419,1406310002132,4,0,0,0,74,0,4294967416} > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,134] [DEBUG] > > [kafka-processor-49917-2] > > >> >> > > > > > [kafka.request.logger] Completed request:Name: > > >> ProducerRequest; > > >> >> > > > Version: > > >> >> > > > > > 0; CorrelationId: 16248; ClientId: ; RequiredAcks: -1; > > >> >> > AckTimeoutMs: > > >> >> > > > > 10000 > > >> >> > > > > > ms from client /127.0.0.1:50168 > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > > > ;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0 > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,134] [DEBUG] > > >> >> > > > > > > > >> >> > > > > > >> >> > > > >> >> > > >> > > [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899] > > >> >> > > > > > [kafka.utils.ZkUtils$] Conditional update of path > > >> >> > > > > > > /brokers/topics/EventServiceUpsertTopic/partitions/5/state > > >> with > > >> >> > value > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > > > {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]} > > >> >> > > > > > and expected version 3 succeeded, returning the new > > version: > > >> 4 > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,136] [DEBUG] > > >> >> > > > > > > > >> >> > > > > > >> >> > > > >> >> > > >> > > [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899] > > >> >> > > > > > [k.controller.PartitionStateMachine] [Partition state > > >> machine > > >> >> on > > >> >> > > > > > Controller 0]: After leader election, leader cache is > > >> updated to > > >> >> > > > > > Map([EventServiceUpsertTopic,18] -> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,4] -> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,7] -> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,17] -> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,16] -> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,5] -> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,10] -> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,1] -> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,13] -> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,9] -> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,8] -> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,11] -> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,6] -> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,12] -> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,2] -> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,3] -> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,14] -> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), [ > > >> >> > > > > > EventServiceUpsertTopic,19] -> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,0] -> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > >> >> > > > > > [EventServiceUpsertTopic,15] -> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1)) > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,136] [DEBUG] > > [kafka-request-handler-1] > > >> >> > > > > > [kafka.cluster.Partition] Partition > > >> >> [EventServiceUpsertTopic,19] > > >> >> > on > > >> >> > > > > broker > > >> >> > > > > > 0: Old hw for partition [EventServiceUpsertTopic,19] is > > 8082. > > >> >> New > > >> >> > hw > > >> >> > > is > > >> >> > > > > > 8082. All leo's are 8112,8082 > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,136] [DEBUG] > > [kafka-request-handler-1] > > >> >> > > > > > [kafka.server.KafkaApis] [KafkaApi-0] Produce to local > log > > >> in > > >> >> 0 ms > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,137] [DEBUG] > > [kafka-processor-49917-2] > > >> >> > > > > > [kafka.request.logger] Completed request:Name: > > >> ProducerRequest; > > >> >> > > > Version: > > >> >> > > > > > 0; CorrelationId: 16250; ClientId: ; RequiredAcks: -1; > > >> >> > AckTimeoutMs: > > >> >> > > > > 10000 > > >> >> > > > > > ms from client /127.0.0.1:50168 > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > > > ;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0 > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,140] [INFO ] > > >> >> > > > > > > > >> >> > > > > > >> >> > > > >> >> > > >> > > [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899] > > >> >> > > > > > [kafka.controller.KafkaController] [Controller 0]: > > >> Partition [ > > >> >> > > > > > EventServiceUpsertTopic,19] completed preferred replica > > >> leader > > >> >> > > > election. > > >> >> > > > > > New leader is 1 > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,140] [INFO ] > > >> >> > > > > > > > >> >> > > > > > >> >> > > > >> >> > > >> > > [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899] > > >> >> > > > > > [kafka.controller.KafkaController] [Controller 0]: > > Partition > > >> >> > > > > > [EventServiceUpsertTopic,5] completed preferred replica > > >> leader > > >> >> > > > election. > > >> >> > > > > > New leader is 1 > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,142] [DEBUG] > > >> >> > > [main-SendThread(localhost:49893)] > > >> >> > > > > > [org.apache.zookeeper.ClientCnxn] Got notification > > >> >> > > > > > sessionid:0x476e9a9e9a0001 > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,142] [DEBUG] > > >> >> > > [main-SendThread(localhost:49893)] > > >> >> > > > > > [org.apache.zookeeper.ClientCnxn] Got WatchedEvent > > >> >> > > state:SyncConnected > > >> >> > > > > > type:NodeDeleted path:/admin/preferred_replica_election > for > > >> >> > sessionid > > >> >> > > > > > 0x476e9a9e9a0001 > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,143] [DEBUG] > > >> >> > > [main-SendThread(localhost:49893)] > > >> >> > > > > > [org.apache.zookeeper.ClientCnxn] Reading reply > > >> >> > > > > > sessionid:0x476e9a9e9a0001, packet:: clientPath:null > > >> >> > serverPath:null > > >> >> > > > > > finished:false header:: 730,2 replyHeader:: > > 730,4294968218,0 > > >> >> > > > request:: > > >> >> > > > > > '/admin/preferred_replica_election,-1 response:: null > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,145] [INFO ] > > [kafka-request-handler-0] > > >> >> > > > > > [kafka.server.ReplicaFetcherManager] > > [ReplicaFetcherManager > > >> on > > >> >> > > broker > > >> >> > > > 0] > > >> >> > > > > > Removed fetcher for partitions > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > > > [EventServiceUpsertTopic,13],[EventServiceUpsertTopic,11],[EventServiceUpsertTopic,17],[EventServiceUpsertTopic,7],[EventServiceUpsertTopic,9],[EventServiceUpsertTopic,1],[EventServiceUpsertTopic,15],[ > > >> >> > > > > > EventServiceUpsertTopic,19 > > >> >> > > > > > ],[EventServiceUpsertTopic,3],[EventServiceUpsertTopic,5] > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,153] [INFO ] > > [kafka-request-handler-0] > > >> >> > > > > > [kafka.log.Log] Truncating log > EventServiceUpsertTopic-19 > > to > > >> >> > offset > > >> >> > > > > 8082. > > >> >> > > > > > > > >> >> > > > > > [2014-07-25 10:40:02,159] [INFO ] > > [kafka-request-handler-0] > > >> >> > > > > > [kafka.log.Log] Truncating log EventServiceUpsertTopic-5 > > to > > >> >> offset > > >> >> > > 0. > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > I have removed some irrelevant lines. As you can see, the > > log > > >> >> for > > >> >> > > > > > EventServiceUpsertTopic-19 is being truncated, losing > > >> messages > > >> >> that > > >> >> > > > have > > >> >> > > > > > not yet been replicated to broker 1. I'm not sure exactly > > >> what > > >> >> the > > >> >> > > > issue > > >> >> > > > > > is. Maybe new requests should be completely blocked after > > >> leader > > >> >> > > > election > > >> >> > > > > > until the new leader catches up to the messages that have > > >> been > > >> >> > acked > > >> >> > > > and > > >> >> > > > > it > > >> >> > > > > > has not yet received. > > >> >> > > > > > > > >> >> > > > > > Thanks, > > >> >> > > > > > > > >> >> > > > > > Jad. > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > On Fri, Jul 25, 2014 at 2:14 PM, Guozhang Wang < > > >> >> wangg...@gmail.com > > >> >> > > > > >> >> > > > > wrote: > > >> >> > > > > > > > >> >> > > > > > > Hello Jad, > > >> >> > > > > > > > > >> >> > > > > > > I double-checked the source code again, and found that > > >> >> actually > > >> >> > the > > >> >> > > > > > > preferred leader elector does consider whether the > > selected > > >> >> > replica > > >> >> > > > is > > >> >> > > > > in > > >> >> > > > > > > ISR or not. This means that by the time the election is > > >> >> > triggered, > > >> >> > > > > > broker 1 > > >> >> > > > > > > is added to ISR by broker 0. Could you check before > step > > >> 3, is > > >> >> > > there > > >> >> > > > > any > > >> >> > > > > > > log entries on broker 0 adding broker 1 to ISR or > > updating > > >> HW > > >> >> to > > >> >> > > 975? > > >> >> > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > On Thu, Jul 24, 2014 at 4:21 PM, Jad Naous < > > >> >> > > > jad.na...@appdynamics.com> > > >> >> > > > > > > wrote: > > >> >> > > > > > > > > >> >> > > > > > > > Hi, > > >> >> > > > > > > > > > >> >> > > > > > > > I have a test that continuously sends messages to one > > >> >> broker, > > >> >> > > > brings > > >> >> > > > > up > > >> >> > > > > > > > another broker, and adds it as a replica for all > > >> partitions, > > >> >> > with > > >> >> > > > it > > >> >> > > > > > > being > > >> >> > > > > > > > the preferred replica for some. I have > > >> >> > > > > > auto.leader.rebalance.enable=true, > > >> >> > > > > > > > so replica election gets triggered. Data is being > > pumped > > >> to > > >> >> the > > >> >> > > old > > >> >> > > > > > > broker > > >> >> > > > > > > > all the while. It seems that some data gets lost > while > > >> >> > switching > > >> >> > > > over > > >> >> > > > > > to > > >> >> > > > > > > > the new leader. Is this a bug, or do I have something > > >> >> > > > misconfigured? > > >> >> > > > > I > > >> >> > > > > > > also > > >> >> > > > > > > > have request.required.acks=-1 on the producer. > > >> >> > > > > > > > > > >> >> > > > > > > > Here's what I think is happening: > > >> >> > > > > > > > > > >> >> > > > > > > > 1. Producer writes message to broker 0, > > >> >> > > > [EventServiceUpsertTopic,13], > > >> >> > > > > > w/ > > >> >> > > > > > > > broker 0 currently leader, with ISR=(0), so write > > returns > > >> >> > > > > successfully, > > >> >> > > > > > > > even when acks = -1. Correlation id 35836 > > >> >> > > > > > > > > > >> >> > > > > > > > Producer log: > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:26,991] [DEBUG] [dw-97 - PATCH > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > > > /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1] > > >> >> > > > > > > > [kafka.producer.BrokerPartitionInfo] Partition > > >> >> > > > > > > > [EventServiceUpsertTopic,13] has leader 0 > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:26,993] [DEBUG] [dw-97 - PATCH > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > > > /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1] > > >> >> > > > > > > > [k.producer.async.DefaultEventHandler] Producer sent > > >> >> messages > > >> >> > > with > > >> >> > > > > > > > correlation id 35836 for topics > > >> >> [EventServiceUpsertTopic,13] to > > >> >> > > > > broker > > >> >> > > > > > 0 > > >> >> > > > > > > on > > >> >> > > > > > > > localhost:56821 > > >> >> > > > > > > > 2. Broker 1 is still catching up > > >> >> > > > > > > > > > >> >> > > > > > > > Broker 0 Log: > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:26,992] [DEBUG] > > >> >> [kafka-request-handler-3] > > >> >> > > > > > > > [kafka.cluster.Partition] Partition > > >> >> > [EventServiceUpsertTopic,13] > > >> >> > > > on > > >> >> > > > > > > broker > > >> >> > > > > > > > 0: Old hw for partition [EventServiceUpsertTopic,13] > is > > >> 971. > > >> >> > New > > >> >> > > hw > > >> >> > > > > is > > >> >> > > > > > > 971. > > >> >> > > > > > > > All leo's are 975,971 > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:26,992] [DEBUG] > > >> >> [kafka-request-handler-3] > > >> >> > > > > > > > [kafka.server.KafkaApis] [KafkaApi-0] Produce to > local > > >> log > > >> >> in > > >> >> > 0 > > >> >> > > ms > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:26,992] [DEBUG] > > >> >> [kafka-processor-56821-0] > > >> >> > > > > > > > [kafka.request.logger] Completed request:Name: > > >> >> > ProducerRequest; > > >> >> > > > > > Version: > > >> >> > > > > > > > 0; CorrelationId: 35836; ClientId: ; RequiredAcks: > -1; > > >> >> > > > AckTimeoutMs: > > >> >> > > > > > > 10000 > > >> >> > > > > > > > ms from client /127.0.0.1:57086 > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > > > ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0 > > >> >> > > > > > > > 3. Leader election is triggered by the scheduler: > > >> >> > > > > > > > > > >> >> > > > > > > > Broker 0 Log: > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:26,991] [INFO ] > [kafka-scheduler-0] > > >> >> > > > > > > > [k.c.PreferredReplicaPartitionLeaderSelector] > > >> >> > > > > > > > [PreferredReplicaPartitionLeaderSelector]: Current > > >> leader 0 > > >> >> for > > >> >> > > > > > > partition [ > > >> >> > > > > > > > EventServiceUpsertTopic,13] is not the preferred > > replica. > > >> >> > > > Trigerring > > >> >> > > > > > > > preferred replica leader election > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:26,993] [DEBUG] > [kafka-scheduler-0] > > >> >> > > > > > > > [kafka.utils.ZkUtils$] Conditional update of path > > >> >> > > > > > > > > > >> /brokers/topics/EventServiceUpsertTopic/partitions/13/state > > >> >> > with > > >> >> > > > > value > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > > > {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]} > > >> >> > > > > > > > and expected version 3 succeeded, returning the new > > >> >> version: 4 > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:26,994] [DEBUG] > [kafka-scheduler-0] > > >> >> > > > > > > > [k.controller.PartitionStateMachine] [Partition > state > > >> >> machine > > >> >> > on > > >> >> > > > > > > > Controller 0]: After leader election, leader cache is > > >> >> updated > > >> >> > to > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > > > Map(<Snipped>(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),<EndSnip>) > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:26,994] [INFO ] > [kafka-scheduler-0] > > >> >> > > > > > > > [kafka.controller.KafkaController] [Controller 0]: > > >> >> Partition [ > > >> >> > > > > > > > EventServiceUpsertTopic,13] completed preferred > replica > > >> >> leader > > >> >> > > > > > election. > > >> >> > > > > > > > New leader is 1 > > >> >> > > > > > > > 4. Broker 1 is still behind, but it sets the high > water > > >> >> mark to > > >> >> > > > > 971!!! > > >> >> > > > > > > > > > >> >> > > > > > > > Broker 1 Log: > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:26,999] [INFO ] > > >> >> [kafka-request-handler-6] > > >> >> > > > > > > > [kafka.server.ReplicaFetcherManager] > > >> >> [ReplicaFetcherManager on > > >> >> > > > > broker > > >> >> > > > > > 1] > > >> >> > > > > > > > Removed fetcher for partitions > > >> [EventServiceUpsertTopic,13] > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:27,000] [DEBUG] > > >> >> [kafka-request-handler-6] > > >> >> > > > > > > > [kafka.cluster.Partition] Partition > > >> >> > [EventServiceUpsertTopic,13] > > >> >> > > > on > > >> >> > > > > > > broker > > >> >> > > > > > > > 1: Old hw for partition [EventServiceUpsertTopic,13] > is > > >> 970. > > >> >> > New > > >> >> > > hw > > >> >> > > > > is > > >> >> > > > > > > -1. > > >> >> > > > > > > > All leo's are -1,971 > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:27,098] [DEBUG] > > >> >> [kafka-request-handler-3] > > >> >> > > > > > > > [kafka.server.KafkaApis] [KafkaApi-1] Maybe update > > >> >> partition > > >> >> > HW > > >> >> > > > due > > >> >> > > > > to > > >> >> > > > > > > > fetch request: Name: FetchRequest; Version: 0; > > >> >> CorrelationId: > > >> >> > 1; > > >> >> > > > > > > ClientId: > > >> >> > > > > > > > ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 > > ms; > > >> >> > > MinBytes: > > >> >> > > > 1 > > >> >> > > > > > > bytes; > > >> >> > > > > > > > RequestInfo: [EventServiceUpsertTopic,13] -> > > >> >> > > > > > > > PartitionFetchInfo(971,1048576), <Snipped> > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:27,098] [DEBUG] > > >> >> [kafka-request-handler-3] > > >> >> > > > > > > > [kafka.cluster.Partition] Partition > > >> >> > [EventServiceUpsertTopic,13] > > >> >> > > > on > > >> >> > > > > > > broker > > >> >> > > > > > > > 1: Recording follower 0 position 971 for partition [ > > >> >> > > > > > > > EventServiceUpsertTopic,13]. > > >> >> > > > > > > > > > >> >> > > > > > > > [2014-07-24 14:44:27,100] [DEBUG] > > >> >> [kafka-request-handler-3] > > >> >> > > > > > > > [kafka.cluster.Partition] Partition > > >> >> > [EventServiceUpsertTopic,13] > > >> >> > > > on > > >> >> > > > > > > broker > > >> >> > > > > > > > 1: Highwatermark for partition > > >> [EventServiceUpsertTopic,13] > > >> >> > > updated > > >> >> > > > > to > > >> >> > > > > > > 971 > > >> >> > > > > > > > 5. Consumer is none the wiser. All data that was in > > >> offsets > > >> >> > > 972-975 > > >> >> > > > > > > doesn't > > >> >> > > > > > > > show up! > > >> >> > > > > > > > > > >> >> > > > > > > > I tried this with 2 initial replicas, and adding a > 3rd > > >> >> which is > > >> >> > > > > > supposed > > >> >> > > > > > > to > > >> >> > > > > > > > be the leader for some new partitions, and this > problem > > >> also > > >> >> > > > happens > > >> >> > > > > > > there. > > >> >> > > > > > > > The log on the old leader gets truncated to the > offset > > on > > >> >> the > > >> >> > new > > >> >> > > > > > leader. > > >> >> > > > > > > > What's the solution? Can I make a new broker leader > for > > >> >> > > partitions > > >> >> > > > > that > > >> >> > > > > > > are > > >> >> > > > > > > > currently active without losing data? > > >> >> > > > > > > > > > >> >> > > > > > > > Thanks, > > >> >> > > > > > > > Jad. > > >> >> > > > > > > > > > >> >> > > > > > > > -- > > >> >> > > > > > > > *Jad Naous* | Engineering | AppDynamics > > >> >> > > > > > > > <http://www.appdynamics.com> > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > -- > > >> >> > > > > > > -- Guozhang > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > -- > > >> >> > > > > > *Jad Naous* | Engineering | AppDynamics > > >> >> > > > > > <http://www.appdynamics.com> > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > -- > > >> >> > > > > -- Guozhang > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > > >> >> > > > >> >> > -- > > >> >> > -- Guozhang > > >> >> > > > >> >> > > >> >> > > >> >> > > >> >> -- > > >> >> *Jad Naous* | Engineering | AppDynamics > > >> >> <http://www.appdynamics.com> > > >> >> > > >> > > > >> > > > >> > > > >> > -- > > >> > -- Guozhang > > >> > > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > > > > > -- > > > *Jad Naous* | Engineering | AppDynamics > > > <http://www.appdynamics.com> > > > > > > > > > > > -- > > -- Guozhang > > > > > > -- > *Jad Naous* | Engineering | AppDynamics > <http://www.appdynamics.com> > -- -- Guozhang