Jun, Thanks for the reply!
I am aware the HW won't advance until the in-sync replicas have _requested_ the messages. However, I believe the issue is that the leader has no guarantee the replicas have _received_ the fetch response. There is no second-phase to the commit. So, in the particular case where a leader transition happens, I believe this race condition exists (and I'm happy to be wrong here, but it looks feasible and explains the data loss I saw): 1. Starting point: Leader and Replica both only have up to message #36 2. Client produces new message with required.acks=all 3. Leader commits message #37, but HW is still #36, the produce request is blocked 4. Replica fetches messages (leader has RECEIVED the fetch request) 5. Leader then advances HW to #37 and unblocks the produce request, client believes it's durable 6. PREFERRED REPLICA ELECTION BEGIN 7. Replica starts become-leader process 8. Leader finishes sending fetch response, replica is just now seeing message #37 9. Replica throws away fetch response from step 4 because it is now becoming leader (partition has been removed from partitionMap so it looks like data is ignored) 10. Leader starts become-follower 11. Leader truncates to replica HW offset of #36 12. Message #37 was durably committed but is now lost For the tickets you linked: https://issues.apache.org/jira/browse/KAFKA-3670 * There was no shutdown involved in this case, so this shouldn't be impacting. https://issues.apache.org/jira/browse/KAFKA-1211 * I've read through this but I'm not entirely sure if it addresses the above. I don't think it does, though. I don't see a step in the ticket about become-leader making a call to the old leader to get the latest generation snapshot? -- Mark Smith m...@qq.is On Fri, Nov 18, 2016, at 10:52 AM, Jun Rao wrote: > Mark, > > Thanks for reporting this. First, a clarification. The HW is actually > never > advanced until all in-sync followers have fetched the corresponding > message. For example, in step 2, if all follower replicas issue a fetch > request at offset 10, it serves as an indication that all replicas have > received messages up to offset 9. So,only then, the HW is advanced to > offset 10 (which is not inclusive). > > I think the problem that you are seeing are probably caused by two known > issues. The first one is > https://issues.apache.org/jira/browse/KAFKA-1211. > The issue is that the HW is propagated asynchronously from the leader to > the followers. If the leadership changes multiple time very quickly, what > can happen is that a follower first truncates its data up to HW and then > immediately becomes the new leader. Since the follower's HW may not be up > to date, some previously committed messages could be lost. The second one > is https://issues.apache.org/jira/browse/KAFKA-3670. The issue is that > controlled shutdown and leader balancing can cause leadership to change > more than once quickly, which could expose the data loss problem in the > first issue. > > The second issue has been fixed in 0.10.0. So, if you upgrade to that > version or above, it should reduce the chance of hitting the first issue > significantly. We are actively working on the first issue and hopefully > it > will be addressed in the next release. > > Jun > > On Thu, Nov 17, 2016 at 5:39 PM, Mark Smith <m...@qq.is> wrote: > > > Hey folks, > > > > I work at Dropbox and I was doing some maintenance yesterday and it > > looks like we lost some committed data during a preferred replica > > election. As far as I understand this shouldn't happen, but I have a > > theory and want to run it by ya'll. > > > > Preamble: > > * Kafka 0.9.0.1 > > * required.acks = -1 (All) > > * min.insync.replicas = 2 (only 2 replicas for the partition, so we > > require both to have the data) > > * consumer is Kafka Connect > > * 1400 topics, total of about 15,000 partitions > > * 30 brokers > > > > I was performing some rolling restarts of brokers yesterday as part of > > our regular DRT (disaster testing) process and at the end that always > > leaves many partitions that need to be failed back to the preferred > > replica. There were about 8,000 partitions that needed moving. I started > > the election in Kafka Manager and it worked, but it looks like 4 of > > those 8,000 partitions experienced some relatively small amount of data > > loss at the tail. > > > > From the Kafka Connect point of view, we saw a handful of these: > > > > [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analytics-staging-8-5] > > INFO Fetch offset 67614479952 is out of range, resetting offset > > (o.a.k.c.c.i.Fetcher:595) > > > > I believe that was because it asked the new leader for data and the new > > leader had less data than the old leader. Indeed, the old leader became > > a follower and immediately truncated: > > > > 2016-11-17 02:55:27,237 INFO log.Log: Truncating log > > goscribe.client-host_activity-21 to offset 67614479601. > > > > Given the above production settings I don't know why KC would ever see > > an OffsetOutOfRange error but this caused KC to reset to the beginning > > of the partition. Various broker logs for the failover paint the > > following timeline: > > https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6 > > > > My current working theory that I'd love eyes on: > > > > 1. Leader receives produce request and appends to log, incrementing > > LEO, but given the durability requirements the HW is not incremented > > and the produce response is delayed (normal) > > > > 2. Replica sends Fetch request to leader as part of normal replication > > flow > > > > 3. Leader increments HW when it STARTS to respond to the Fetch request > > (per fetchMessages in ReplicaManager.scala), so the HW is updated as > > soon as we've prepared messages for response -- importantly the HW is > > updated even though the replica has not yet actually seen the > > messages, even given the durability settings we've got > > > > 4. Meanwhile, Kafka Connect sends Fetch request to leader and receives > > the messages below the new HW, but the messages have not actually been > > received by the replica yet still > > > > 5. Preferred replica election begins (oh the travesty!) > > > > 6. Replica starts the become-leader process and makeLeader removes > > this partition from partitionMap, which means when the response comes > > in finally, we ignore it (we discard the old-leader committed > > messages) > > > > 7. Old-leader starts become-follower process and truncates to the HW > > of the new-leader i.e. the old-leader has now thrown away data it had > > committed and given out moments ago > > > > 8. Kafka Connect sends Fetch request to the new-leader but its offset > > is now greater than the HW of the new-leader, so we get the > > OffsetOutOfRange error and restart > > > > Can someone tell me whether or not this is plausible? If it is, is there > > a known issue/bug filed for it? I'm not exactly sure what the solution > > is, but it does seem unfortunate that a normal operation (leader > > election with both brokers alive and well) can result in the loss of > > committed messages. > > > > And, if my theory doesn't hold, can anybody explain what happened? I'm > > happy to provide more logs or whatever. > > > > Thanks! > > > > > > -- > > Mark Smith > > m...@qq.is > >