Jun,
I see what's going on -- the leader doesn't update its HW as soon as the follower has requested the messages, it updates when the follower requests the _next_ messages. I.e., it infers that because the follower requested from offset 38 that everything <= 37 is durable. This makes sense and means my understanding was wrong and this wasn't an issue. Thanks for helping clear that up. This means there is still an unresolved issue, unfortunately. I can replicate the conditions that led to it and see if I can reproduce the problem. If so, I'll update this thread again. -- Mark Smith m...@qq.is On Mon, Nov 21, 2016, at 06:59 PM, Jun Rao wrote: > Hi, Mark, > > So you did the manual leader election after the cluster is stabilized > (i.e, all replicas are in sync)? Then, it may not be related to the > issue that I described. > > If there is just a single leadership change, what you described > shouldn't happen by design. I modified your steps to the following > according to the design and I am not sure how the message can be lost. > > 1. Starting point: Leader and Replica both only have up to message > #36, HW is at 37 > 2. Client produces new message with required.acks=all at offset 37 > 3. the produce request is blocked > 4. Replica fetches messages at offset 37 > 5. Leader's HW still at 37 and the produce request still blocked > 5.1 Replica receives message at 37 and appends it to local log. > 5.2 Replica fetches messages at offset 38 > 5.3 Leader's HW moves to 38 and the produce request is unblocked > 5.4 Replica's HW still at 37 > 6. PREFERRED REPLICA ELECTION BEGIN > 8. Replica becomes leader; no truncation is done; message at offset 37 > is preserved; HW still at 37 > 9. Connect issues a fetch request at 38 to replica and gets an empty > response instead of OffsetOutOfRangeException since the log end > offset is at 38. > 9. Leader becomes follower; truncate to HW 38, keeping message at > offset 37. > 10. Leader starts fetch from offset 38 > 11. Replica moves HW to 38 > 12. Message #37 is preserved in both replicas and is not lost > > > BTW, do you have unclean leader election disabled? Is this issue > reproducible? If so, we can enable some debug level logging to see > what's causing this. Now, I am also not sure if this is a broker side > issue or a consumer side issue. > > Thanks, > > Jun > > > On Mon, Nov 21, 2016 at 5:20 PM, Mark Smith <m...@qq.is> wrote: >> __ >> Jun, >> >> Yeah, I probably have an off-by-one issue in the HW description. I >> think you could pick any number here and the problem remains -- could >> you read through the steps I posted and see if they logically make >> sense, numbers aside? >> >> We definitely lost data in 4 partitions of the 8,000 that were >> elected, and there was only a single election for each partition. We >> had done a rolling restart hours before, but that had been done for a >> long time and everything was stable. We do not allow automatic >> election, it's a manual process that we initiate after the cluster >> has stabilized. >> >> So in this case, I still don't think any discussion about multiple- >> failovers is germane to the problem we saw. Each of our partitions >> only had a single failover, and yet 4 of them still truncated >> committed data. >> >> -- >> Mark Smith >> m...@qq.is >> >> >> On Mon, Nov 21, 2016, at 05:12 PM, Jun Rao wrote: >>> Hi, Mark, >>> >>> Hmm, the committing of a message at offset X is the equivalent of >>> saying that the HW is at offset X + 1. So, in your example, if the >>> producer publishes a new message at offset 37, this message won't be >>> committed (i.e., HW moves to offset 38) until the leader sees the >>> follower fetch from offset 38 (not offset 37). At that point, the >>> follower would have received message at offset 37 in the fetch >>> response and appended that message to its local log. If the follower >>> now becomes the new leader, message at offset 37 is preserved. >>> >>> The problem that I described regarding data loss can happen during a >>> rolling restart. Suppose that you have 3 replicas A, B, and C. Let's >>> say A is the preferred the leader, but during the deployment, the >>> leader gets moved to replica B at some point and all 3 replicas are >>> in sync. A new message is produced at offset 37 and is committed >>> (leader's HW =38). However, the HW in replica A is still at 37. Now, >>> we try to shutdown broker B and the leader gets moved to replica C. >>> Replica A starts to follow replica C and it first truncates to HW >>> 37, which removes the message at offset 37. Now, preferred leader >>> logic kicks in and the leadership switches again to replica A. Since >>> A doesn't have message at offset 37 any more and all followers copy >>> messages from replica A, message at offset 37 is lost. >>> >>> With KAFKA-3670, in the above example, when shutting down broker B, >>> the leader will be directly moved to replica A since it's a >>> preferred replica. So the above scenario won't happen. >>> >>> The more complete fix is in KAFKA-1211. The logic for getting the >>> latest generation snapshot is just a proposal and is not in the code >>> base yet. >>> >>> Thanks, >>> >>> Jun >>> >>> On Mon, Nov 21, 2016 at 3:20 PM, Mark Smith <m...@qq.is> wrote: >>>> Jun, >>>> >>>> Thanks for the reply! >>>> >>>> I am aware the HW won't advance until the in-sync replicas have >>>> _requested_ the messages. However, I believe the issue is that the >>>> leader has no guarantee the replicas have _received_ the fetch >>>> response. >>>> There is no second-phase to the commit. >>>> >>>> So, in the particular case where a leader transition happens, I >>>> believe >>>> this race condition exists (and I'm happy to be wrong here, but it >>>> looks >>>> feasible and explains the data loss I saw): >>>> >>>> 1. Starting point: Leader and Replica both only have up to message >>>> #36 >>>> 2. Client produces new message with required.acks=all >>>> 3. Leader commits message #37, but HW is still #36, the produce >>>> request >>>> is blocked >>>> 4. Replica fetches messages (leader has RECEIVED the fetch request) >>>> 5. Leader then advances HW to #37 and unblocks the produce request, >>>> client believes it's durable >>>> 6. PREFERRED REPLICA ELECTION BEGIN >>>> 7. Replica starts become-leader process >>>> 8. Leader finishes sending fetch response, replica is just now >>>> seeing >>>> message #37 >>>> 9. Replica throws away fetch response from step 4 because it is now >>>> becoming leader (partition has been removed from partitionMap so it >>>> looks like data is ignored) >>>> 10. Leader starts become-follower >>>> 11. Leader truncates to replica HW offset of #36 >>>> 12. Message #37 was durably committed but is now lost >>>> >>>> For the tickets you linked: >>>> >>>> https://issues.apache.org/jira/browse/KAFKA-3670 >>>> * There was no shutdown involved in this case, so this shouldn't be >>>> impacting. >>>> >>>> https://issues.apache.org/jira/browse/KAFKA-1211 >>>> * I've read through this but I'm not entirely sure if it addresses >>>> the >>>> above. I don't think it does, though. I don't see a step in the >>>> ticket >>>> about become-leader making a call to the old leader to get the >>>> latest >>>> generation snapshot? >>>> >>>> -- >>>> Mark Smith >>>> m...@qq.is >>>> >>>> On Fri, Nov 18, 2016, at 10:52 AM, Jun Rao wrote: >>>> > Mark, >>>> > >>>> > Thanks for reporting this. First, a clarification. The HW is >>>> > actually >>>> > never >>>> > advanced until all in-sync followers have fetched the >>>> > corresponding >>>> > message. For example, in step 2, if all follower replicas issue a >>>> > fetch >>>> > request at offset 10, it serves as an indication that all >>>> > replicas have >>>> > received messages up to offset 9. So,only then, the HW is >>>> > advanced to >>>> > offset 10 (which is not inclusive). >>>> > >>>> > I think the problem that you are seeing are probably caused by >>>> > two known >>>> > issues. The first one is >>>> > https://issues.apache.org/jira/browse/KAFKA-1211. >>>> > The issue is that the HW is propagated asynchronously from the >>>> > leader to >>>> > the followers. If the leadership changes multiple time very >>>> > quickly, what >>>> > can happen is that a follower first truncates its data up to HW >>>> > and then >>>> > immediately becomes the new leader. Since the follower's HW may >>>> > not be up >>>> > to date, some previously committed messages could be lost. The >>>> > second one >>>> > is https://issues.apache.org/jira/browse/KAFKA-3670. The issue is >>>> > that >>>> > controlled shutdown and leader balancing can cause leadership to >>>> > change >>>> > more than once quickly, which could expose the data loss problem >>>> > in the >>>> > first issue. >>>> > >>>> > The second issue has been fixed in 0.10.0. So, if you upgrade to >>>> > that >>>> > version or above, it should reduce the chance of hitting the >>>> > first issue >>>> > significantly. We are actively working on the first issue and >>>> > hopefully >>>> > it >>>> > will be addressed in the next release. >>>> > >>>> > Jun >>>> > >>>> > On Thu, Nov 17, 2016 at 5:39 PM, Mark Smith <m...@qq.is> wrote: >>>> > >>>> > > Hey folks, >>>> > > >>>> > > I work at Dropbox and I was doing some maintenance yesterday >>>> > > and it >>>> > > looks like we lost some committed data during a preferred >>>> > > replica >>>> > > election. As far as I understand this shouldn't happen, but I >>>> > > have a >>>> > > theory and want to run it by ya'll. >>>> > > >>>> > > Preamble: >>>> > > * Kafka 0.9.0.1 >>>> > > * required.acks = -1 (All) >>>> > > * min.insync.replicas = 2 (only 2 replicas for the partition, >>>> > > so we >>>> > > require both to have the data) >>>> > > * consumer is Kafka Connect >>>> > > * 1400 topics, total of about 15,000 partitions >>>> > > * 30 brokers >>>> > > >>>> > > I was performing some rolling restarts of brokers yesterday as >>>> > > part of >>>> > > our regular DRT (disaster testing) process and at the end that >>>> > > always >>>> > > leaves many partitions that need to be failed back to the >>>> > > preferred >>>> > > replica. There were about 8,000 partitions that needed moving. >>>> > > I started >>>> > > the election in Kafka Manager and it worked, but it looks like >>>> > > 4 of >>>> > > those 8,000 partitions experienced some relatively small amount >>>> > > of data >>>> > > loss at the tail. >>>> > > >>>> > > From the Kafka Connect point of view, we saw a handful of >>>> > > these: >>>> > > >>>> > > [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analytics-staging-8- >>>> > > 5] >>>> > > INFO Fetch offset 67614479952 is out of range, resetting offset >>>> > > (o.a.k.c.c.i.Fetcher:595) >>>> > > >>>> > > I believe that was because it asked the new leader for data and >>>> > > the new >>>> > > leader had less data than the old leader. Indeed, the old >>>> > > leader became >>>> > > a follower and immediately truncated: >>>> > > >>>> > > 2016-11-17 02:55:27,237 INFO log.Log: Truncating log >>>> > > goscribe.client-host_activity-21 to offset 67614479601. >>>> > > >>>> > > Given the above production settings I don't know why KC would >>>> > > ever see >>>> > > an OffsetOutOfRange error but this caused KC to reset to the >>>> > > beginning >>>> > > of the partition. Various broker logs for the failover paint >>>> > > the >>>> > > following timeline: >>>> > > https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6 >>>> > > >>>> > > My current working theory that I'd love eyes on: >>>> > > >>>> > > 1. Leader receives produce request and appends to log, >>>> > > incrementing >>>> > > LEO, but given the durability requirements the HW is not >>>> > > incremented >>>> > > and the produce response is delayed (normal) >>>> > > >>>> > > 2. Replica sends Fetch request to leader as part of normal >>>> > > replication >>>> > > flow >>>> > > >>>> > > 3. Leader increments HW when it STARTS to respond to the >>>> > > Fetch request >>>> > > (per fetchMessages in ReplicaManager.scala), so the HW is >>>> > > updated as >>>> > > soon as we've prepared messages for response -- importantly >>>> > > the HW is >>>> > > updated even though the replica has not yet actually seen the >>>> > > messages, even given the durability settings we've got >>>> > > >>>> > > 4. Meanwhile, Kafka Connect sends Fetch request to leader and >>>> > > receives >>>> > > the messages below the new HW, but the messages have not >>>> > > actually been >>>> > > received by the replica yet still >>>> > > >>>> > > 5. Preferred replica election begins (oh the travesty!) >>>> > > >>>> > > 6. Replica starts the become-leader process and makeLeader >>>> > > removes >>>> > > this partition from partitionMap, which means when the >>>> > > response comes >>>> > > in finally, we ignore it (we discard the old-leader committed >>>> > > messages) >>>> > > >>>> > > 7. Old-leader starts become-follower process and truncates to >>>> > > the HW >>>> > > of the new-leader i.e. the old-leader has now thrown away >>>> > > data it had >>>> > > committed and given out moments ago >>>> > > >>>> > > 8. Kafka Connect sends Fetch request to the new-leader but >>>> > > its offset >>>> > > is now greater than the HW of the new-leader, so we get the >>>> > > OffsetOutOfRange error and restart >>>> > > >>>> > > Can someone tell me whether or not this is plausible? If it is, >>>> > > is there >>>> > > a known issue/bug filed for it? I'm not exactly sure what the >>>> > > solution >>>> > > is, but it does seem unfortunate that a normal operation >>>> > > (leader >>>> > > election with both brokers alive and well) can result in the >>>> > > loss of >>>> > > committed messages. >>>> > > >>>> > > And, if my theory doesn't hold, can anybody explain what >>>> > > happened? I'm >>>> > > happy to provide more logs or whatever. >>>> > > >>>> > > Thanks! >>>> > > >>>> > > >>>> > > -- >>>> > > Mark Smith >>>> > > m...@qq.is >>>> > > >>