Also, for the initial two replica case did you see any error/warning logs on your producers?
Guozhang On Mon, Jul 28, 2014 at 10:32 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Jad, > > Just to clarify, you also see data loss when you created the topic with > replica factor 2, and two replicas running, and after an auto leader > election triggered? If that is the case could you attach the logs of all > involved brokers here? > > For your second question, KAFKA-1211 is designed to handle that case. > > Guozhang > > > On Mon, Jul 28, 2014 at 10:18 AM, Jad Naous <jad.na...@appdynamics.com> > wrote: > >> Guozhang, >> >> I have actually also seen this happen when there are two replicas >> initially. So this problem is not limited to 1 replica. The issue is the >> truncation after leader election, which will also happen on the second >> replica. >> >> Coming back to your objections: >> >> >> First case: inconsistency between replicas >> > 1) currently replica 1 is the current leader >> > replica 1: m1 m2 m3 >> > replica 2: m1 m2 >> > 2) replica 1 fails, and replica 2 becomes the new leader and accept >> > messages 4 and 5: >> > replica 1: m1 m2 m3 >> > replica 2: m1 m2 m4 m5 >> > 3) replica 1 resumes, and does not truncate to HW, then it will still >> > maintain m3, which is actually never "committed". Say leader moves to >> > replica 1 again, we can ended up with: >> > replica 1: m1 m2 m3 m6 >> > replica 2: m1 m2 m4 m5 >> >> >> I see. So when a replica resumes, it has to truncate to the last HW it saw >> before it died. >> >> >> > >> > Second case: inconsistency between server and clients: >> > 1) producer send message m3 with ack=-1: >> > replica 1: m1 m2 m3 >> > replica 2: m1 m2 m3 >> > replica 3: m1 m2 >> > 2) the response is held until all replicas also gets m3, say at this >> time >> > current leader replica 1 fails and replica 3 re-elects. If replica 3 >> gets >> > up to the largest LEO it will also get m3. >> > replica 2: m1 m2 m3 >> > replica 3: m1 m2 m3 >> > 3) But m3 is not actually "committed" by the time replica 1 fails; when >> > producer gets the error at the time replica 1 fails, it will think that >> m3 >> > was not successfully sent, so retry sending m3: >> > replica 2: m1 m2 m3 m3 >> > replica 3: m1 m2 m3 m3 >> >> >> So on failure, all nodes need to truncate to the HW. But if there's no >> failure, then truncating would lose data unnecessarily. Maybe those two >> scenarios need to be handled differently? >> >> Jad. >> >> >> On Mon, Jul 28, 2014 at 9:58 AM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> > Jun, Jad, >> > >> > I think in this case data loss can still happen, since the replica >> factor >> > was previously one, and in handling the produce requests, if the server >> > decides that all the produced partitions have a replica factor of 1 it >> will >> > also directly send back the response instead of putting the request into >> > purgatory even if currently the number of replicas is 2 (for details >> look >> > at ReplicaManager.getReplicationFactorForPartition and search of the >> usage >> > of Partition.replicationFactor). >> > >> > I now agree that this is not related to KAFKA-1211 but a different small >> > bug. We need to probably file another JIRA for this. But I think after >> this >> > one is fixed (which should be much easier than KAFKA-1211), Jad's >> scenario >> > should not cause data loss anymore. >> > >> > Guozhang >> > >> > >> > On Sun, Jul 27, 2014 at 6:11 PM, Jad Naous <jad.na...@appdynamics.com> >> > wrote: >> > >> > > So in summary, is it true to say that currently triggering leader >> > > reelection is not a safe operation? I have been able to reproduce that >> > > message loss pretty reliably in tests. If that is the case, isn't >> that an >> > > important operation in a large cluster where nodes go up and down? >> > > On Jul 25, 2014 10:00 PM, "Jun Rao" <jun...@gmail.com> wrote: >> > > >> > > > Actually, I don't think KAFKA-1211 will happen with just 2 replicas. >> > > When a >> > > > replica becomes a leader, it never truncates its log. Only when a >> > replica >> > > > becomes follower, it truncates its log to HW. So in this particular >> > case, >> > > > the new leader will not truncate data to offset 8. >> > > > >> > > > Thanks, >> > > > >> > > > Jun >> > > > >> > > > >> > > > On Fri, Jul 25, 2014 at 3:37 PM, Guozhang Wang <wangg...@gmail.com> >> > > wrote: >> > > > >> > > > > Hi Jad, >> > > > > >> > > > > Yes. In this case I think you are actually hitting KAFKA-1211. The >> > > > summary >> > > > > of the issue is that, it takes one more fetch request round trip >> for >> > > the >> > > > > follower replica to advance the HW after the leader has advanced >> HW. >> > So >> > > > for >> > > > > your case, the whole process is like this: >> > > > > >> > > > > 1. leader LEO at 10, follower LEO at 8. Both leader and follower >> > knows >> > > > the >> > > > > LEO is at 8. >> > > > > 2. Follower fetch data on Leader starting at 8, leader records its >> > LEO >> > > as >> > > > > 8. >> > > > > 3. Follower gets 9 and 10 and append to its local log. >> > > > > 4. Follower fetch data on Leader starting at 10, leader records >> its >> > LEO >> > > > as >> > > > > 10; now leader knows follower has caught up, it advances its HW >> to 10 >> > > and >> > > > > adds the follower to ISR (but follower does not know that yet! It >> > still >> > > > > think the HW is 8). >> > > > > 5. Leader's fetch response gets back to follower, and now the >> > follower >> > > > > knows that HW has been updated to 10. >> > > > > >> > > > > And let's say there is a leader election between step 4) and 5), >> for >> > > your >> > > > > case it is due to preferred leader election, but it could also be >> > that >> > > > > current leader fails, etc. Then on becoming the new leader the >> > follower >> > > > > will truncate its data to 8, which is the HW it knows. Hence the >> data >> > > > loss. >> > > > > >> > > > > The proposed solution in KAFKA-1211 will tackle this issue. >> > > > > >> > > > > Guozhang >> > > > > >> > > > > >> > > > > On Fri, Jul 25, 2014 at 2:48 PM, Jad Naous < >> > jad.na...@appdynamics.com> >> > > > > wrote: >> > > > > >> > > > > > Hi Guozhang, >> > > > > > >> > > > > > Yes, broker 1 is in the ISR (in fact, I wait for broker 1 to be >> in >> > > the >> > > > > ISR >> > > > > > before triggering election). However, I think there is something >> > > still >> > > > > > amiss. I still see data loss. Here are some relevant log lines >> from >> > > > > broker >> > > > > > 0 (different test run). The log on broker 0 is getting >> truncated, >> > > > losing >> > > > > > some messages. >> > > > > > >> > > > > > [2014-07-25 10:40:02,134] [DEBUG] [kafka-request-handler-5] >> > > > > > [kafka.cluster.Partition] Partition >> [EventServiceUpsertTopic,19] >> > on >> > > > > broker >> > > > > > 0: Old hw for partition [EventServiceUpsertTopic,19] is 8082. >> New >> > hw >> > > is >> > > > > > 8082. All leo's are 8111,8082 >> > > > > > >> > > > > > [2014-07-25 10:40:02,134] [DEBUG] [kafka-request-handler-5] >> > > > > > [kafka.server.KafkaApis] [KafkaApi-0] Produce to local log in >> 1 ms >> > > > > > >> > > > > > [2014-07-25 10:40:02,134] [DEBUG] >> > > [main-SendThread(localhost:49893)] >> > > > > > [org.apache.zookeeper.ClientCnxn] Reading reply >> > > > > > sessionid:0x476e9a9e9a0001, packet:: clientPath:null >> > serverPath:null >> > > > > > finished:false header:: 729,5 replyHeader:: 729,4294968217,0 >> > > > request:: >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> '/brokers/topics/EventServiceUpsertTopic/partitions/5/state,#7b22636f6e74726f6c6c65725f65706f6368223a312c226c6561646572223a312c2276657273696f6e223a312c226c65616465725f65706f6368223a332c22697372223a5b302c315d7d,3 >> > > > > > response:: >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> s{4294967416,4294968217,1406309966419,1406310002132,4,0,0,0,74,0,4294967416} >> > > > > > >> > > > > > [2014-07-25 10:40:02,134] [DEBUG] [kafka-processor-49917-2] >> > > > > > [kafka.request.logger] Completed request:Name: ProducerRequest; >> > > > Version: >> > > > > > 0; CorrelationId: 16248; ClientId: ; RequiredAcks: -1; >> > AckTimeoutMs: >> > > > > 10000 >> > > > > > ms from client /127.0.0.1:50168 >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> ;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0 >> > > > > > >> > > > > > [2014-07-25 10:40:02,134] [DEBUG] >> > > > > > >> > > > >> > >> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899] >> > > > > > [kafka.utils.ZkUtils$] Conditional update of path >> > > > > > /brokers/topics/EventServiceUpsertTopic/partitions/5/state with >> > value >> > > > > > >> > > > > >> > > > >> > > >> > >> {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]} >> > > > > > and expected version 3 succeeded, returning the new version: 4 >> > > > > > >> > > > > > [2014-07-25 10:40:02,136] [DEBUG] >> > > > > > >> > > > >> > >> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899] >> > > > > > [k.controller.PartitionStateMachine] [Partition state machine >> on >> > > > > > Controller 0]: After leader election, leader cache is updated to >> > > > > > Map([EventServiceUpsertTopic,18] -> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,4] -> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,7] -> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,17] -> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,16] -> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,5] -> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,10] -> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,1] -> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,13] -> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,9] -> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,8] -> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,11] -> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,6] -> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,12] -> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,2] -> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,3] -> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,14] -> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), [ >> > > > > > EventServiceUpsertTopic,19] -> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,0] -> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), >> > > > > > [EventServiceUpsertTopic,15] -> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1)) >> > > > > > >> > > > > > [2014-07-25 10:40:02,136] [DEBUG] [kafka-request-handler-1] >> > > > > > [kafka.cluster.Partition] Partition >> [EventServiceUpsertTopic,19] >> > on >> > > > > broker >> > > > > > 0: Old hw for partition [EventServiceUpsertTopic,19] is 8082. >> New >> > hw >> > > is >> > > > > > 8082. All leo's are 8112,8082 >> > > > > > >> > > > > > [2014-07-25 10:40:02,136] [DEBUG] [kafka-request-handler-1] >> > > > > > [kafka.server.KafkaApis] [KafkaApi-0] Produce to local log in >> 0 ms >> > > > > > >> > > > > > [2014-07-25 10:40:02,137] [DEBUG] [kafka-processor-49917-2] >> > > > > > [kafka.request.logger] Completed request:Name: ProducerRequest; >> > > > Version: >> > > > > > 0; CorrelationId: 16250; ClientId: ; RequiredAcks: -1; >> > AckTimeoutMs: >> > > > > 10000 >> > > > > > ms from client /127.0.0.1:50168 >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> ;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0 >> > > > > > >> > > > > > [2014-07-25 10:40:02,140] [INFO ] >> > > > > > >> > > > >> > >> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899] >> > > > > > [kafka.controller.KafkaController] [Controller 0]: Partition [ >> > > > > > EventServiceUpsertTopic,19] completed preferred replica leader >> > > > election. >> > > > > > New leader is 1 >> > > > > > >> > > > > > [2014-07-25 10:40:02,140] [INFO ] >> > > > > > >> > > > >> > >> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899] >> > > > > > [kafka.controller.KafkaController] [Controller 0]: Partition >> > > > > > [EventServiceUpsertTopic,5] completed preferred replica leader >> > > > election. >> > > > > > New leader is 1 >> > > > > > >> > > > > > [2014-07-25 10:40:02,142] [DEBUG] >> > > [main-SendThread(localhost:49893)] >> > > > > > [org.apache.zookeeper.ClientCnxn] Got notification >> > > > > > sessionid:0x476e9a9e9a0001 >> > > > > > >> > > > > > [2014-07-25 10:40:02,142] [DEBUG] >> > > [main-SendThread(localhost:49893)] >> > > > > > [org.apache.zookeeper.ClientCnxn] Got WatchedEvent >> > > state:SyncConnected >> > > > > > type:NodeDeleted path:/admin/preferred_replica_election for >> > sessionid >> > > > > > 0x476e9a9e9a0001 >> > > > > > >> > > > > > [2014-07-25 10:40:02,143] [DEBUG] >> > > [main-SendThread(localhost:49893)] >> > > > > > [org.apache.zookeeper.ClientCnxn] Reading reply >> > > > > > sessionid:0x476e9a9e9a0001, packet:: clientPath:null >> > serverPath:null >> > > > > > finished:false header:: 730,2 replyHeader:: 730,4294968218,0 >> > > > request:: >> > > > > > '/admin/preferred_replica_election,-1 response:: null >> > > > > > >> > > > > > [2014-07-25 10:40:02,145] [INFO ] [kafka-request-handler-0] >> > > > > > [kafka.server.ReplicaFetcherManager] [ReplicaFetcherManager on >> > > broker >> > > > 0] >> > > > > > Removed fetcher for partitions >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> [EventServiceUpsertTopic,13],[EventServiceUpsertTopic,11],[EventServiceUpsertTopic,17],[EventServiceUpsertTopic,7],[EventServiceUpsertTopic,9],[EventServiceUpsertTopic,1],[EventServiceUpsertTopic,15],[ >> > > > > > EventServiceUpsertTopic,19 >> > > > > > ],[EventServiceUpsertTopic,3],[EventServiceUpsertTopic,5] >> > > > > > >> > > > > > [2014-07-25 10:40:02,153] [INFO ] [kafka-request-handler-0] >> > > > > > [kafka.log.Log] Truncating log EventServiceUpsertTopic-19 to >> > offset >> > > > > 8082. >> > > > > > >> > > > > > [2014-07-25 10:40:02,159] [INFO ] [kafka-request-handler-0] >> > > > > > [kafka.log.Log] Truncating log EventServiceUpsertTopic-5 to >> offset >> > > 0. >> > > > > > >> > > > > > >> > > > > > I have removed some irrelevant lines. As you can see, the log >> for >> > > > > > EventServiceUpsertTopic-19 is being truncated, losing messages >> that >> > > > have >> > > > > > not yet been replicated to broker 1. I'm not sure exactly what >> the >> > > > issue >> > > > > > is. Maybe new requests should be completely blocked after leader >> > > > election >> > > > > > until the new leader catches up to the messages that have been >> > acked >> > > > and >> > > > > it >> > > > > > has not yet received. >> > > > > > >> > > > > > Thanks, >> > > > > > >> > > > > > Jad. >> > > > > > >> > > > > > >> > > > > > On Fri, Jul 25, 2014 at 2:14 PM, Guozhang Wang < >> wangg...@gmail.com >> > > >> > > > > wrote: >> > > > > > >> > > > > > > Hello Jad, >> > > > > > > >> > > > > > > I double-checked the source code again, and found that >> actually >> > the >> > > > > > > preferred leader elector does consider whether the selected >> > replica >> > > > is >> > > > > in >> > > > > > > ISR or not. This means that by the time the election is >> > triggered, >> > > > > > broker 1 >> > > > > > > is added to ISR by broker 0. Could you check before step 3, is >> > > there >> > > > > any >> > > > > > > log entries on broker 0 adding broker 1 to ISR or updating HW >> to >> > > 975? >> > > > > > > >> > > > > > > >> > > > > > > On Thu, Jul 24, 2014 at 4:21 PM, Jad Naous < >> > > > jad.na...@appdynamics.com> >> > > > > > > wrote: >> > > > > > > >> > > > > > > > Hi, >> > > > > > > > >> > > > > > > > I have a test that continuously sends messages to one >> broker, >> > > > brings >> > > > > up >> > > > > > > > another broker, and adds it as a replica for all partitions, >> > with >> > > > it >> > > > > > > being >> > > > > > > > the preferred replica for some. I have >> > > > > > auto.leader.rebalance.enable=true, >> > > > > > > > so replica election gets triggered. Data is being pumped to >> the >> > > old >> > > > > > > broker >> > > > > > > > all the while. It seems that some data gets lost while >> > switching >> > > > over >> > > > > > to >> > > > > > > > the new leader. Is this a bug, or do I have something >> > > > misconfigured? >> > > > > I >> > > > > > > also >> > > > > > > > have request.required.acks=-1 on the producer. >> > > > > > > > >> > > > > > > > Here's what I think is happening: >> > > > > > > > >> > > > > > > > 1. Producer writes message to broker 0, >> > > > [EventServiceUpsertTopic,13], >> > > > > > w/ >> > > > > > > > broker 0 currently leader, with ISR=(0), so write returns >> > > > > successfully, >> > > > > > > > even when acks = -1. Correlation id 35836 >> > > > > > > > >> > > > > > > > Producer log: >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:26,991] [DEBUG] [dw-97 - PATCH >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1] >> > > > > > > > [kafka.producer.BrokerPartitionInfo] Partition >> > > > > > > > [EventServiceUpsertTopic,13] has leader 0 >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:26,993] [DEBUG] [dw-97 - PATCH >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1] >> > > > > > > > [k.producer.async.DefaultEventHandler] Producer sent >> messages >> > > with >> > > > > > > > correlation id 35836 for topics >> [EventServiceUpsertTopic,13] to >> > > > > broker >> > > > > > 0 >> > > > > > > on >> > > > > > > > localhost:56821 >> > > > > > > > 2. Broker 1 is still catching up >> > > > > > > > >> > > > > > > > Broker 0 Log: >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:26,992] [DEBUG] >> [kafka-request-handler-3] >> > > > > > > > [kafka.cluster.Partition] Partition >> > [EventServiceUpsertTopic,13] >> > > > on >> > > > > > > broker >> > > > > > > > 0: Old hw for partition [EventServiceUpsertTopic,13] is 971. >> > New >> > > hw >> > > > > is >> > > > > > > 971. >> > > > > > > > All leo's are 975,971 >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:26,992] [DEBUG] >> [kafka-request-handler-3] >> > > > > > > > [kafka.server.KafkaApis] [KafkaApi-0] Produce to local log >> in >> > 0 >> > > ms >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:26,992] [DEBUG] >> [kafka-processor-56821-0] >> > > > > > > > [kafka.request.logger] Completed request:Name: >> > ProducerRequest; >> > > > > > Version: >> > > > > > > > 0; CorrelationId: 35836; ClientId: ; RequiredAcks: -1; >> > > > AckTimeoutMs: >> > > > > > > 10000 >> > > > > > > > ms from client /127.0.0.1:57086 >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0 >> > > > > > > > 3. Leader election is triggered by the scheduler: >> > > > > > > > >> > > > > > > > Broker 0 Log: >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:26,991] [INFO ] [kafka-scheduler-0] >> > > > > > > > [k.c.PreferredReplicaPartitionLeaderSelector] >> > > > > > > > [PreferredReplicaPartitionLeaderSelector]: Current leader 0 >> for >> > > > > > > partition [ >> > > > > > > > EventServiceUpsertTopic,13] is not the preferred replica. >> > > > Trigerring >> > > > > > > > preferred replica leader election >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:26,993] [DEBUG] [kafka-scheduler-0] >> > > > > > > > [kafka.utils.ZkUtils$] Conditional update of path >> > > > > > > > /brokers/topics/EventServiceUpsertTopic/partitions/13/state >> > with >> > > > > value >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]} >> > > > > > > > and expected version 3 succeeded, returning the new >> version: 4 >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:26,994] [DEBUG] [kafka-scheduler-0] >> > > > > > > > [k.controller.PartitionStateMachine] [Partition state >> machine >> > on >> > > > > > > > Controller 0]: After leader election, leader cache is >> updated >> > to >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> Map(<Snipped>(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),<EndSnip>) >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:26,994] [INFO ] [kafka-scheduler-0] >> > > > > > > > [kafka.controller.KafkaController] [Controller 0]: >> Partition [ >> > > > > > > > EventServiceUpsertTopic,13] completed preferred replica >> leader >> > > > > > election. >> > > > > > > > New leader is 1 >> > > > > > > > 4. Broker 1 is still behind, but it sets the high water >> mark to >> > > > > 971!!! >> > > > > > > > >> > > > > > > > Broker 1 Log: >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:26,999] [INFO ] >> [kafka-request-handler-6] >> > > > > > > > [kafka.server.ReplicaFetcherManager] >> [ReplicaFetcherManager on >> > > > > broker >> > > > > > 1] >> > > > > > > > Removed fetcher for partitions [EventServiceUpsertTopic,13] >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:27,000] [DEBUG] >> [kafka-request-handler-6] >> > > > > > > > [kafka.cluster.Partition] Partition >> > [EventServiceUpsertTopic,13] >> > > > on >> > > > > > > broker >> > > > > > > > 1: Old hw for partition [EventServiceUpsertTopic,13] is 970. >> > New >> > > hw >> > > > > is >> > > > > > > -1. >> > > > > > > > All leo's are -1,971 >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:27,098] [DEBUG] >> [kafka-request-handler-3] >> > > > > > > > [kafka.server.KafkaApis] [KafkaApi-1] Maybe update >> partition >> > HW >> > > > due >> > > > > to >> > > > > > > > fetch request: Name: FetchRequest; Version: 0; >> CorrelationId: >> > 1; >> > > > > > > ClientId: >> > > > > > > > ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; >> > > MinBytes: >> > > > 1 >> > > > > > > bytes; >> > > > > > > > RequestInfo: [EventServiceUpsertTopic,13] -> >> > > > > > > > PartitionFetchInfo(971,1048576), <Snipped> >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:27,098] [DEBUG] >> [kafka-request-handler-3] >> > > > > > > > [kafka.cluster.Partition] Partition >> > [EventServiceUpsertTopic,13] >> > > > on >> > > > > > > broker >> > > > > > > > 1: Recording follower 0 position 971 for partition [ >> > > > > > > > EventServiceUpsertTopic,13]. >> > > > > > > > >> > > > > > > > [2014-07-24 14:44:27,100] [DEBUG] >> [kafka-request-handler-3] >> > > > > > > > [kafka.cluster.Partition] Partition >> > [EventServiceUpsertTopic,13] >> > > > on >> > > > > > > broker >> > > > > > > > 1: Highwatermark for partition [EventServiceUpsertTopic,13] >> > > updated >> > > > > to >> > > > > > > 971 >> > > > > > > > 5. Consumer is none the wiser. All data that was in offsets >> > > 972-975 >> > > > > > > doesn't >> > > > > > > > show up! >> > > > > > > > >> > > > > > > > I tried this with 2 initial replicas, and adding a 3rd >> which is >> > > > > > supposed >> > > > > > > to >> > > > > > > > be the leader for some new partitions, and this problem also >> > > > happens >> > > > > > > there. >> > > > > > > > The log on the old leader gets truncated to the offset on >> the >> > new >> > > > > > leader. >> > > > > > > > What's the solution? Can I make a new broker leader for >> > > partitions >> > > > > that >> > > > > > > are >> > > > > > > > currently active without losing data? >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Jad. >> > > > > > > > >> > > > > > > > -- >> > > > > > > > *Jad Naous* | Engineering | AppDynamics >> > > > > > > > <http://www.appdynamics.com> >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > -- >> > > > > > > -- Guozhang >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > *Jad Naous* | Engineering | AppDynamics >> > > > > > <http://www.appdynamics.com> >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > -- Guozhang >> > > > > >> > > > >> > > >> > >> > >> > >> > -- >> > -- Guozhang >> > >> >> >> >> -- >> *Jad Naous* | Engineering | AppDynamics >> <http://www.appdynamics.com> >> > > > > -- > -- Guozhang > -- -- Guozhang