Jun,

I see what's going on -- the leader doesn't update its HW as soon as the
follower has requested the messages, it updates when the follower
requests the _next_ messages. I.e., it infers that because the follower
requested from offset 38 that everything <= 37 is durable.

This makes sense and means my understanding was wrong and this wasn't an
issue. Thanks for helping clear that up.


This means there is still an unresolved issue, unfortunately. I can
replicate the conditions that led to it and see if I can reproduce the
problem. If so, I'll update this thread again.


--

Mark Smith

m...@qq.is





On Mon, Nov 21, 2016, at 06:59 PM, Jun Rao wrote:

> Hi, Mark,

> 

> So you did the manual leader election after the cluster is stabilized
> (i.e, all replicas are in sync)? Then, it may not be related to the
> issue that I described.
> 

> If there is just a single leadership change, what you described
> shouldn't happen by design. I modified your steps to the following
> according to the design and I am not sure how the message can be lost.
> 

> 1. Starting point: Leader and Replica both only have up to message
>    #36, HW is at 37
> 2. Client produces new message with required.acks=all at offset 37

> 3.  the produce request is blocked

> 4. Replica fetches messages at offset 37

> 5. Leader's HW still at 37 and the produce request still blocked

> 5.1 Replica receives message at 37 and appends it to local log.

> 5.2 Replica fetches messages at offset 38

> 5.3 Leader's HW moves to 38 and the produce request is unblocked

> 5.4 Replica's HW still at 37

> 6. PREFERRED REPLICA ELECTION BEGIN

> 8. Replica becomes leader; no truncation is done; message at offset 37
>    is preserved; HW still at 37
> 9. Connect issues a fetch request at 38 to replica and gets an empty
>    response instead of OffsetOutOfRangeException since the log end
>    offset is at 38.
> 9. Leader becomes follower; truncate to HW 38, keeping message at
>    offset 37.
> 10. Leader starts fetch from offset 38

> 11. Replica moves HW to 38

> 12. Message #37 is preserved in both replicas and is not lost

> 

> 

> BTW, do you have unclean leader election disabled? Is this issue
> reproducible? If so, we can enable some debug level logging to see
> what's causing this. Now, I am also not sure if this is a broker side
> issue or a consumer side issue.
> 

> Thanks,

> 

> Jun

> 

> 

> On Mon, Nov 21, 2016 at 5:20 PM, Mark Smith <m...@qq.is> wrote:

>> __

>> Jun,

>> 

>> Yeah, I probably have an off-by-one issue in the HW description. I
>> think you could pick any number here and the problem remains -- could
>> you read through the steps I posted and see if they logically make
>> sense, numbers aside?
>> 

>> We definitely lost data in 4 partitions of the 8,000 that were
>> elected, and there was only a single election for each partition. We
>> had done a rolling restart hours before, but that had been done for a
>> long time and everything was stable. We do not allow automatic
>> election, it's a manual process that we initiate after the cluster
>> has stabilized.
>> 

>> So in this case, I still don't think any discussion about multiple-
>> failovers is germane to the problem we saw. Each of our partitions
>> only had a single failover, and yet 4 of them still truncated
>> committed data.
>> 

>> --

>> Mark Smith

>> m...@qq.is

>> 

>> 

>> On Mon, Nov 21, 2016, at 05:12 PM, Jun Rao wrote:

>>> Hi, Mark,

>>> 

>>> Hmm, the committing of a message at offset X is the equivalent of
>>> saying that the HW is at offset X + 1. So, in your example, if the
>>> producer publishes a new message at offset 37, this message won't be
>>> committed (i.e., HW moves to offset 38) until the leader sees the
>>> follower fetch from offset 38 (not offset 37). At that point, the
>>> follower would have received message at offset 37 in the fetch
>>> response and appended that message to its local log. If the follower
>>> now becomes the new leader, message at offset 37 is preserved.
>>> 

>>> The problem that I described regarding data loss can happen during a
>>> rolling restart. Suppose that you have 3 replicas A, B, and C. Let's
>>> say A is the preferred the leader, but during the deployment, the
>>> leader gets moved to replica B at some point and all 3 replicas are
>>> in sync. A new message is produced at offset 37 and is committed
>>> (leader's HW =38). However, the HW in replica A is still at 37. Now,
>>> we try to shutdown broker B and the leader gets moved to replica C.
>>> Replica A starts to follow replica C and it first truncates to HW
>>> 37, which removes the message at offset 37. Now, preferred leader
>>> logic kicks in and the leadership switches again to replica A. Since
>>> A doesn't have message at offset 37 any more and all followers copy
>>> messages from replica A, message at offset 37 is lost.
>>> 

>>> With KAFKA-3670, in the above example, when shutting down broker B,
>>> the leader will be directly moved to replica A since it's a
>>> preferred replica. So the above scenario won't happen.
>>> 

>>> The more complete fix is in KAFKA-1211. The logic for getting the
>>> latest generation snapshot is just a proposal and is not in the code
>>> base yet.
>>> 

>>> Thanks,

>>> 

>>> Jun

>>> 

>>> On Mon, Nov 21, 2016 at 3:20 PM, Mark Smith <m...@qq.is> wrote:

>>>> Jun,

>>>> 

>>>> Thanks for the reply!

>>>> 

>>>> I am aware the HW won't advance until the in-sync replicas have

>>>> _requested_ the messages. However, I believe the issue is that the
>>>> leader has no guarantee the replicas have _received_ the fetch
>>>> response.
>>>> There is no second-phase to the commit.

>>>> 

>>>> So, in the particular case where a leader transition happens, I
>>>> believe
>>>> this race condition exists (and I'm happy to be wrong here, but it
>>>> looks
>>>> feasible and explains the data loss I saw):

>>>> 

>>>> 1. Starting point: Leader and Replica both only have up to message
>>>>    #36
>>>> 2. Client produces new message with required.acks=all

>>>> 3. Leader commits message #37, but HW is still #36, the produce
>>>>    request
>>>> is blocked

>>>> 4. Replica fetches messages (leader has RECEIVED the fetch request)
>>>> 5. Leader then advances HW to #37 and unblocks the produce request,
>>>> client believes it's durable

>>>> 6. PREFERRED REPLICA ELECTION BEGIN

>>>> 7. Replica starts become-leader process

>>>> 8. Leader finishes sending fetch response, replica is just now
>>>>    seeing
>>>> message #37

>>>> 9. Replica throws away fetch response from step 4 because it is now
>>>> becoming leader (partition has been removed from partitionMap so it
>>>> looks like data is ignored)

>>>> 10. Leader starts become-follower

>>>> 11. Leader truncates to replica HW offset of #36

>>>> 12. Message #37 was durably committed but is now lost

>>>> 

>>>> For the tickets you linked:

>>>> 

>>>> https://issues.apache.org/jira/browse/KAFKA-3670

>>>> * There was no shutdown involved in this case, so this shouldn't be
>>>> impacting.

>>>> 

>>>> https://issues.apache.org/jira/browse/KAFKA-1211

>>>> * I've read through this but I'm not entirely sure if it addresses
>>>>   the
>>>> above. I don't think it does, though. I don't see a step in the
>>>> ticket
>>>> about become-leader making a call to the old leader to get the
>>>> latest
>>>> generation snapshot?

>>>> 
>>>> --
>>>>  Mark Smith
>>>> m...@qq.is

>>>> 

>>>> On Fri, Nov 18, 2016, at 10:52 AM, Jun Rao wrote:

>>>> > Mark,

>>>> >

>>>> > Thanks for reporting this. First, a clarification. The HW is
>>>> > actually
>>>> > never

>>>> > advanced until all in-sync followers have fetched the
>>>> > corresponding
>>>> > message. For example, in step 2, if all follower replicas issue a
>>>> > fetch
>>>> > request at offset 10, it serves as an indication that all
>>>> > replicas have
>>>> > received messages up to offset 9. So,only then, the HW is
>>>> > advanced to
>>>> > offset 10 (which is not inclusive).

>>>> >

>>>> > I think the problem that you are seeing are probably caused by
>>>> > two known
>>>> > issues. The first one is

>>>> > https://issues.apache.org/jira/browse/KAFKA-1211.

>>>> > The issue is that the HW is propagated asynchronously from the
>>>> > leader to
>>>> > the followers. If the leadership changes multiple time very
>>>> > quickly, what
>>>> > can happen is that a follower first truncates its data up to HW
>>>> > and then
>>>> > immediately becomes the new leader. Since the follower's HW may
>>>> > not be up
>>>> > to date, some previously committed messages could be lost. The
>>>> > second one
>>>> > is https://issues.apache.org/jira/browse/KAFKA-3670. The issue is
>>>> > that
>>>> > controlled shutdown and leader balancing can cause leadership to
>>>> > change
>>>> > more than once quickly, which could expose the data loss problem
>>>> > in the
>>>> > first issue.

>>>> >

>>>> > The second issue has been fixed in 0.10.0. So, if you upgrade to
>>>> > that
>>>> > version or above, it should reduce the chance of hitting the
>>>> > first issue
>>>> > significantly. We are actively working on the first issue and
>>>> > hopefully
>>>> > it

>>>> > will be addressed in the next release.

>>>> >

>>>> > Jun

>>>> >

>>>> > On Thu, Nov 17, 2016 at 5:39 PM, Mark Smith <m...@qq.is> wrote:

>>>> >

>>>> > > Hey folks,

>>>> > >

>>>> > > I work at Dropbox and I was doing some maintenance yesterday
>>>> > > and it
>>>> > > looks like we lost some committed data during a preferred
>>>> > > replica
>>>> > > election. As far as I understand this shouldn't happen, but I
>>>> > > have a
>>>> > > theory and want to run it by ya'll.

>>>> > >

>>>> > > Preamble:

>>>> > > * Kafka 0.9.0.1

>>>> > > * required.acks = -1 (All)

>>>> > > * min.insync.replicas = 2 (only 2 replicas for the partition,
>>>> > >   so we
>>>> > > require both to have the data)

>>>> > > * consumer is Kafka Connect

>>>> > > * 1400 topics, total of about 15,000 partitions

>>>> > > * 30 brokers

>>>> > >

>>>> > > I was performing some rolling restarts of brokers yesterday as
>>>> > > part of
>>>> > > our regular DRT (disaster testing) process and at the end that
>>>> > > always
>>>> > > leaves many partitions that need to be failed back to the
>>>> > > preferred
>>>> > > replica. There were about 8,000 partitions that needed moving.
>>>> > > I started
>>>> > > the election in Kafka Manager and it worked, but it looks like
>>>> > > 4 of
>>>> > > those 8,000 partitions experienced some relatively small amount
>>>> > > of data
>>>> > > loss at the tail.

>>>> > >

>>>> > > From the Kafka Connect point of view, we saw a handful of
>>>> > > these:
>>>> > >

>>>> > > [2016-11-17 02:55:26,513] [WorkerSinkTask-clogger-analytics-staging-8-
>>>> > > 5]
>>>> > > INFO Fetch offset 67614479952 is out of range, resetting offset
>>>> > > (o.a.k.c.c.i.Fetcher:595)

>>>> > >

>>>> > > I believe that was because it asked the new leader for data and
>>>> > > the new
>>>> > > leader had less data than the old leader. Indeed, the old
>>>> > > leader became
>>>> > > a follower and immediately truncated:

>>>> > >

>>>> > > 2016-11-17 02:55:27,237 INFO log.Log: Truncating log

>>>> > > goscribe.client-host_activity-21 to offset 67614479601.

>>>> > >

>>>> > > Given the above production settings I don't know why KC would
>>>> > > ever see
>>>> > > an OffsetOutOfRange error but this caused KC to reset to the
>>>> > > beginning
>>>> > > of the partition. Various broker logs for the failover paint
>>>> > > the
>>>> > > following timeline:

>>>> > > https://gist.github.com/zorkian/d80a4eb288d40c1ee7fb5d2d340986d6
>>>> > >

>>>> > > My current working theory that I'd love eyes on:

>>>> > >

>>>> > >   1. Leader receives produce request and appends to log,
>>>> > >      incrementing
>>>> > >   LEO, but given the durability requirements the HW is not
>>>> > >   incremented
>>>> > >   and the produce response is delayed (normal)

>>>> > >

>>>> > >   2. Replica sends Fetch request to leader as part of normal
>>>> > >      replication
>>>> > >   flow

>>>> > >

>>>> > >   3. Leader increments HW when it STARTS to respond to the
>>>> > >      Fetch request
>>>> > >   (per fetchMessages in ReplicaManager.scala), so the HW is
>>>> > >   updated as
>>>> > >   soon as we've prepared messages for response -- importantly
>>>> > >   the HW is
>>>> > >   updated even though the replica has not yet actually seen the
>>>> > >   messages, even given the durability settings we've got

>>>> > >

>>>> > >   4. Meanwhile, Kafka Connect sends Fetch request to leader and
>>>> > >      receives
>>>> > >   the messages below the new HW, but the messages have not
>>>> > >   actually been
>>>> > >   received by the replica yet still

>>>> > >

>>>> > >   5. Preferred replica election begins (oh the travesty!)

>>>> > >

>>>> > >   6. Replica starts the become-leader process and makeLeader
>>>> > >      removes
>>>> > >   this partition from partitionMap, which means when the
>>>> > >   response comes
>>>> > >   in finally, we ignore it (we discard the old-leader committed
>>>> > >   messages)

>>>> > >

>>>> > >   7. Old-leader starts become-follower process and truncates to
>>>> > >      the HW
>>>> > >   of the new-leader i.e. the old-leader has now thrown away
>>>> > >   data it had
>>>> > >   committed and given out moments ago

>>>> > >

>>>> > >   8. Kafka Connect sends Fetch request to the new-leader but
>>>> > >      its offset
>>>> > >   is now greater than the HW of the new-leader, so we get the

>>>> > >   OffsetOutOfRange error and restart

>>>> > >

>>>> > > Can someone tell me whether or not this is plausible? If it is,
>>>> > > is there
>>>> > > a known issue/bug filed for it? I'm not exactly sure what the
>>>> > > solution
>>>> > > is, but it does seem unfortunate that a normal operation
>>>> > > (leader
>>>> > > election with both brokers alive and well) can result in the
>>>> > > loss of
>>>> > > committed messages.

>>>> > >

>>>> > > And, if my theory doesn't hold, can anybody explain what
>>>> > > happened? I'm
>>>> > > happy to provide more logs or whatever.

>>>> > >

>>>> > > Thanks!

>>>> > >

>>>> > >

>>>> > > --

>>>> > > Mark Smith

>>>> > > m...@qq.is

>>>> > >

>> 


Reply via email to