Jun,

Thanks for the reply!

I am aware the HW won't advance until the in-sync replicas have
_requested_ the messages. However, I believe the issue is that the
leader has no guarantee the replicas have _received_ the fetch response.
There is no second-phase to the commit.

So, in the particular case where a leader transition happens, I believe
this race condition exists (and I'm happy to be wrong here, but it looks
feasible and explains the data loss I saw):

1. Starting point: Leader and Replica both only have up to message #36
2. Client produces new message with required.acks=all
3. Leader commits message #37, but HW is still #36, the produce request
is blocked
4. Replica fetches messages (leader has RECEIVED the fetch request)
5. Leader then advances HW to #37 and unblocks the produce request,
client believes it's durable
6. PREFERRED REPLICA ELECTION BEGIN
7. Replica starts become-leader process
8. Leader finishes sending fetch response, replica is just now seeing
message #37
9. Replica throws away fetch response from step 4 because it is now
becoming leader (partition has been removed from partitionMap so it
looks like data is ignored)
10. Leader starts become-follower
11. Leader truncates to replica HW offset of #36
12. Message #37 was durably committed but is now lost

For the tickets you linked:

https://issues.apache.org/jira/browse/KAFKA-3670
* There was no shutdown involved in this case, so this shouldn't be
impacting.

https://issues.apache.org/jira/browse/KAFKA-1211
* I've read through this but I'm not entirely sure if it addresses the
above. I don't think it does, though. I don't see a step in the ticket
about become-leader making a call to the old leader to get the latest
generation snapshot?

-- 
Mark Smith
m...@qq.is

On Fri, Nov 18, 2016, at 10:52 AM, Jun Rao wrote:
> Mark,
> 
> Thanks for reporting this. First, a clarification. The HW is actually
> never
> advanced until all in-sync followers have fetched the corresponding
> message. For example, in step 2, if all follower replicas issue a fetch
> request at offset 10, it serves as an indication that all replicas have
> received messages up to offset 9. So,only then, the HW is advanced to
> offset 10 (which is not inclusive).
> 
> I think the problem that you are seeing are probably caused by two known
> issues. The first one is
> https://issues.apache.org/jira/browse/KAFKA-1211.
> The issue is that the HW is propagated asynchronously from the leader to
> the followers. If the leadership changes multiple time very quickly, what
> can happen is that a follower first truncates its data up to HW and then
> immediately becomes the new leader. Since the follower's HW may not be up
> to date, some previously committed messages could be lost. The second one
> is https://issues.apache.org/jira/browse/KAFKA-3670. The issue is that
> controlled shutdown and leader balancing can cause leadership to change
> more than once quickly, which could expose the data loss problem in the
> first issue.
> 
> The second issue has been fixed in 0.10.0. So, if you upgrade to that
> version or above, it should reduce the chance of hitting the first issue
> significantly. We are actively working on the first issue and hopefully
> it
> will be addressed in the next release.
> 
> Jun
> 
> On Thu, Nov 17, 2016 at 5:39 PM, Mark Smith <m...@qq.is> wrote:
> 
> > Hey folks,
> >
> > I work at Dropbox and I was doing some maintenance yesterday and it
> > looks like we lost some committed data during a preferred replica
> > election. As far as I understand this shouldn't happen, but I have a
> > theory and want to run it by ya'll.
> >
> > Preamble:
> > * Kafka 0.9.0.1
> > * required.acks = -1 (All)
> > * min.insync.replicas = 2 (only 2 replicas for the partition, so we
> > require both to have the data)
> > * consumer is Kafka Connect
> > * 1400 topics, total of about 15,000 partitions
> > * 30 brokers
> >
> > I was performing some rolling restarts of brokers yesterday as part of
> > our regular DRT (disaster testing) process and at the end that always
> > leaves many partitions that need to be failed back to the preferred
> > replica. There were about 8,000 partitions that needed moving. I started
> > the election in Kafka Manager and it worked, but it looks like 4 of
> > those 8,000 partitions experienced some relatively small amount of data
> > loss at the tail.
> >
> > From the Kafka Connect point of view, we saw a handful of these:
> >
> > [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analytics-staging-8-5]
> > INFO Fetch offset 67614479952 is out of range, resetting offset
> > (o.a.k.c.c.i.Fetcher:595)
> >
> > I believe that was because it asked the new leader for data and the new
> > leader had less data than the old leader. Indeed, the old leader became
> > a follower and immediately truncated:
> >
> > 2016-11-17 02:55:27,237 INFO log.Log: Truncating log
> > goscribe.client-host_activity-21 to offset 67614479601.
> >
> > Given the above production settings I don't know why KC would ever see
> > an OffsetOutOfRange error but this caused KC to reset to the beginning
> > of the partition. Various broker logs for the failover paint the
> > following timeline:
> > https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6
> >
> > My current working theory that I'd love eyes on:
> >
> >   1. Leader receives produce request and appends to log, incrementing
> >   LEO, but given the durability requirements the HW is not incremented
> >   and the produce response is delayed (normal)
> >
> >   2. Replica sends Fetch request to leader as part of normal replication
> >   flow
> >
> >   3. Leader increments HW when it STARTS to respond to the Fetch request
> >   (per fetchMessages in ReplicaManager.scala), so the HW is updated as
> >   soon as we've prepared messages for response -- importantly the HW is
> >   updated even though the replica has not yet actually seen the
> >   messages, even given the durability settings we've got
> >
> >   4. Meanwhile, Kafka Connect sends Fetch request to leader and receives
> >   the messages below the new HW, but the messages have not actually been
> >   received by the replica yet still
> >
> >   5. Preferred replica election begins (oh the travesty!)
> >
> >   6. Replica starts the become-leader process and makeLeader removes
> >   this partition from partitionMap, which means when the response comes
> >   in finally, we ignore it (we discard the old-leader committed
> >   messages)
> >
> >   7. Old-leader starts become-follower process and truncates to the HW
> >   of the new-leader i.e. the old-leader has now thrown away data it had
> >   committed and given out moments ago
> >
> >   8. Kafka Connect sends Fetch request to the new-leader but its offset
> >   is now greater than the HW of the new-leader, so we get the
> >   OffsetOutOfRange error and restart
> >
> > Can someone tell me whether or not this is plausible? If it is, is there
> > a known issue/bug filed for it? I'm not exactly sure what the solution
> > is, but it does seem unfortunate that a normal operation (leader
> > election with both brokers alive and well) can result in the loss of
> > committed messages.
> >
> > And, if my theory doesn't hold, can anybody explain what happened? I'm
> > happy to provide more logs or whatever.
> >
> > Thanks!
> >
> >
> > --
> > Mark Smith
> > m...@qq.is
> >

Reply via email to