Hello folks,

I've also updated the KIP wiki page adding a section of the proposed
metrics for this Raft protocol:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP-595:ARaftProtocolfortheMetadataQuorum-Metrics

Please let us know if you have any thoughts about them as well.

Guozhang

On Thu, May 7, 2020 at 4:04 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Thanks Jun. Further replies are in-lined.
>
> On Mon, May 4, 2020 at 11:58 AM Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Guozhang,
>>
>> Thanks for the reply. A few more replies inlined below.
>>
>> On Sun, May 3, 2020 at 6:33 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>
>> > Hello Jun,
>> >
>> > Thanks for your comments! I'm replying inline below:
>> >
>> > On Fri, May 1, 2020 at 12:36 PM Jun Rao <j...@confluent.io> wrote:
>> >
>> > > 101. Bootstrapping related issues.
>> > > 101.1 Currently, we support auto broker id generation. Is this
>> supported
>> > > for bootstrap brokers?
>> > >
>> >
>> > The vote ids would just be the broker ids. "bootstrap.servers" would be
>> > similar to what client configs have today, where "quorum.voters" would
>> be
>> > pre-defined config values.
>> >
>> >
>> My question was on the auto generated broker id. Currently, the broker can
>> choose to have its broker Id auto generated. The generation is done
>> through
>> ZK to guarantee uniqueness. Without ZK, it's not clear how the broker id
>> is
>> auto generated. "quorum.voters" also can't be set statically if broker ids
>> are auto generated.
>>
>> Jason has explained some ideas that we've discussed so far, the reason we
> intentional did not include them so far is that we feel it is out-side the
> scope of KIP-595. Under the umbrella of KIP-500 we should definitely
> address them though.
>
> On the high-level, our belief is that "joining a quorum" and "joining (or
> more specifically, registering brokers in) the cluster" would be
> de-coupled a bit, where the former should be completed before we do the
> latter. More specifically, assuming the quorum is already up and running,
> after the newly started broker found the leader of the quorum it can send a
> specific RegisterBroker request including its listener / protocol / etc,
> and upon handling it the leader can send back the uniquely generated broker
> id to the new broker, while also executing the "startNewBroker" callback as
> the controller.
>
>
>>
>> > > 102. Log compaction. One weak spot of log compaction is for the
>> consumer
>> > to
>> > > deal with deletes. When a key is deleted, it's retained as a tombstone
>> > > first and then physically removed. If a client misses the tombstone
>> > > (because it's physically removed), it may not be able to update its
>> > > metadata properly. The way we solve this in Kafka is based on a
>> > > configuration (log.cleaner.delete.retention.ms) and we expect a
>> consumer
>> > > having seen an old key to finish reading the deletion tombstone within
>> > that
>> > > time. There is no strong guarantee for that since a broker could be
>> down
>> > > for a long time. It would be better if we can have a more reliable
>> way of
>> > > dealing with deletes.
>> > >
>> >
>> > We propose to capture this in the "FirstDirtyOffset" field of the quorum
>> > record fetch response: the offset is the maximum offset that log
>> compaction
>> > has reached up to. If the follower has fetched beyond this offset it
>> means
>> > itself is safe hence it has seen all records up to that offset. On
>> getting
>> > the response, the follower can then decide if its end offset actually
>> below
>> > that dirty offset (and hence may miss some tombstones). If that's the
>> case:
>> >
>> > 1) Naively, it could re-bootstrap metadata log from the very beginning
>> to
>> > catch up.
>> > 2) During that time, it would refrain itself from answering
>> MetadataRequest
>> > from any clients.
>> >
>> >
>> I am not sure that the "FirstDirtyOffset" field fully addresses the issue.
>> Currently, the deletion tombstone is not removed immediately after a round
>> of cleaning. It's removed after a delay in a subsequent round of cleaning.
>> Consider an example where a key insertion is at offset 200 and a deletion
>> tombstone of the key is at 400. Initially, FirstDirtyOffset is at 300. A
>> follower/observer fetches from offset 0  and fetches the key at offset
>> 200.
>> A few rounds of cleaning happen. FirstDirtyOffset is at 500 and the
>> tombstone at 400 is physically removed. The follower/observer continues
>> the
>> fetch, but misses offset 400. It catches all the way to FirstDirtyOffset
>> and declares its metadata as ready. However, its metadata could be stale
>> since it actually misses the deletion of the key.
>>
>> Yeah good question, I should have put more details in my explanation :)
>
> The idea is that we will adjust the log compaction for this raft based
> metadata log: before more details to be explained, since we have two types
> of "watermarks" here, whereas in Kafka the watermark indicates where every
> replica have replicated up to and in Raft the watermark indicates where the
> majority of replicas (here only indicating voters of the quorum, not
> counting observers) have replicated up to, let's call them Kafka watermark
> and Raft watermark. For this special log, we would maintain both watermarks.
>
> When log compacting on the leader, we would only compact up to the Kafka
> watermark, i.e. if there is at least one voter who have not replicated an
> entry, it would not be compacted. The "dirty-offset" is the offset that
> we've compacted up to and is communicated to other voters, and the other
> voters would also compact up to this value --- i.e. the difference here is
> that instead of letting each replica doing log compaction independently,
> we'll have the leader to decide upon which offset to compact to, and
> propagate this value to others to follow, in a more coordinated manner.
> Also note when there are new voters joining the quorum who has not
> replicated up to the dirty-offset, of because of other issues they
> truncated their logs to below the dirty-offset, they'd have to re-bootstrap
> from the beginning, and during this period of time the leader learned about
> this lagging voter would not advance the watermark (also it would not
> decrement it), and hence not compacting either, until the voter(s) has
> caught up to that dirty-offset.
>
> So back to your example above, before the bootstrap voter gets to 300 no
> log compaction would happen on the leader; and until later when the voter
> have got to beyond 400 and hence replicated that tombstone, the log
> compaction would possibly get to that tombstone and remove it. Say later it
> the leader's log compaction reaches 500, it can send this back to the voter
> who can then also compact locally up to 500.
>
>
>> > > 105. Quorum State: In addition to VotedId, do we need the epoch
>> > > corresponding to VotedId? Over time, the same broker Id could be
>> voted in
>> > > different generations with different epoch.
>> > >
>> > >
>> > Hmm, this is a good point. Originally I think the "LeaderEpoch" field in
>> > that file is corresponding to the "latest known leader epoch", not the
>> > "current leader epoch". For example, if the current epoch is N, and
>> then a
>> > vote-request with epoch N+1 is received and the voter granted the vote
>> for
>> > it, then it means for this voter it knows the "latest epoch" is N + 1
>> > although it is unknown if that sending candidate will indeed become the
>> new
>> > leader (which would only be notified via begin-quorum request). However,
>> > when persisting the quorum state, we would encode leader-epoch to N+1,
>> > while the leaderId to be the older leader.
>> >
>> > But now thinking about this a bit more, I feel we should use two
>> separate
>> > epochs, one for the "lates known" and one for the "current" to pair with
>> > the leaderId. I will update the wiki page.
>> >
>> >
>> Hmm, it's kind of weird to bump up the leader epoch before the new leader
>> is actually elected, right.
>>
>>
>> > > 106. "OFFSET_OUT_OF_RANGE: Used in the FetchQuorumRecords API to
>> indicate
>> > > that the follower has fetched from an invalid offset and should
>> truncate
>> > to
>> > > the offset/epoch indicated in the response." Observers can't truncate
>> > their
>> > > logs. What should they do with OFFSET_OUT_OF_RANGE?
>> > >
>> > >
>> > I'm not sure if I understand your question? Observers should still be
>> able
>> > to truncate their logs as well.
>> >
>> >
>> Hmm, I thought only the quorum nodes have local logs and observers don't?
>>
>> > 107. "The leader will continue sending BeginQuorumEpoch to each known
>> > voter
>> > > until it has received its endorsement." If a voter is down for a long
>> > time,
>> > > sending BeginQuorumEpoch seems to add unnecessary overhead. Similarly,
>> > if a
>> > > follower stops sending FetchQuorumRecords, does the leader keep
>> sending
>> > > BeginQuorumEpoch?
>> > >
>> >
>> > Regarding BeginQuorumEpoch: that is a good point. The begin-quorum-epoch
>> > request is for voters to quickly get the new leader information; however
>> > even if they do not get them they can still eventually learn about that
>> > from others via gossiping FindQuorum. I think we can adjust the logic to
>> > e.g. exponential back-off or with a limited num.retries.
>> >
>> > Regarding FetchQuorumRecords: if the follower sends FetchQuorumRecords
>> > already, it means that follower already knows that the broker is the
>> > leader, and hence we can stop retrying BeginQuorumEpoch; however it is
>> > possible that after a follower sends FetchQuorumRecords already,
>> suddenly
>> > it stops send it (possibly because it learned about a higher epoch
>> leader),
>> > and hence this broker may be a "zombie" leader and we propose to use the
>> > fetch.timeout to let the leader to try to verify if it has already been
>> > stale.
>> >
>> >
>> It just seems that we should handle these two cases in a consistent way?
>>
>> Yes I agree, on the leader's side, the FetchQuorumRecords from a follower
> could mean that we no longer needs to send BeginQuorumEpoch anymore --- and
> it is already part of our current implementations in
> https://github.com/confluentinc/kafka/commits/kafka-raft
>
>
>> Thanks,
>>
>> Jun
>>
>> >
>> > >
>> > > Jun
>> > >
>> > > On Wed, Apr 29, 2020 at 8:56 PM Guozhang Wang <wangg...@gmail.com>
>> > wrote:
>> > >
>> > > > Hello Leonard,
>> > > >
>> > > > Thanks for your comments, I'm relying in line below:
>> > > >
>> > > > On Wed, Apr 29, 2020 at 1:58 AM Wang (Leonard) Ge <w...@confluent.io
>> >
>> > > > wrote:
>> > > >
>> > > > > Hi Kafka developers,
>> > > > >
>> > > > > It's great to see this proposal and it took me some time to finish
>> > > > reading
>> > > > > it.
>> > > > >
>> > > > > And I have the following questions about the Proposal:
>> > > > >
>> > > > >    - How do we plan to test this design to ensure its
>> correctness? Or
>> > > > more
>> > > > >    broadly, how do we ensure that our new ‘pull’ based model is
>> > > > functional
>> > > > > and
>> > > > >    correct given that it is different from the original RAFT
>> > > > implementation
>> > > > >    which has formal proof of correctness?
>> > > > >
>> > > >
>> > > > We have two planned verifications on the correctness and liveness of
>> > the
>> > > > design. One is via model verification (TLA+)
>> > > > https://github.com/guozhangwang/kafka-specification
>> > > >
>> > > > Another is via the concurrent simulation tests
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/confluentinc/kafka/commit/5c0c054597d2d9f458cad0cad846b0671efa2d91
>> > > >
>> > > >    - Have we considered any sensible defaults for the configuration,
>> > i.e.
>> > > > >    all the election timeout, fetch time out, etc.? Or we want to
>> > leave
>> > > > > this to
>> > > > >    a later stage when we do the performance testing, etc.
>> > > > >
>> > > >
>> > > > This is a good question, the reason we did not set any default
>> values
>> > for
>> > > > the timeout configurations is that we think it may take some
>> > benchmarking
>> > > > experiments to get these defaults right. Some high-level principles
>> to
>> > > > consider: 1) the fetch.timeout should be around the same scale with
>> zk
>> > > > session timeout, which is now 18 seconds by default -- in practice
>> > we've
>> > > > seen unstable networks having more than 10 secs of transient
>> > > connectivity,
>> > > > 2) the election.timeout, however, should be smaller than the fetch
>> > > timeout
>> > > > as is also suggested as a practical optimization in literature:
>> > > > https://www.cl.cam.ac.uk/~ms705/pub/papers/2015-osr-raft.pdf
>> > > >
>> > > > Some more discussions can be found here:
>> > > > https://github.com/confluentinc/kafka/pull/301/files#r415420081
>> > > >
>> > > >
>> > > > >    - Have we considered piggybacking `BeginQuorumEpoch` with the `
>> > > > >    FetchQuorumRecords`? I might be missing something obvious but
>> I am
>> > > > just
>> > > > >    wondering why don’t we just use the `FindQuorum` and
>> > > > > `FetchQuorumRecords`
>> > > > >    APIs and remove the `BeginQuorumEpoch` API?
>> > > > >
>> > > >
>> > > > Note that Begin/EndQuorumEpoch is sent from leader -> other voter
>> > > > followers, while FindQuorum / Fetch are sent from follower to
>> leader.
>> > > > Arguably one can eventually realize the new leader and epoch via
>> > > gossiping
>> > > > FindQuorum, but that could in practice require a long delay. Having
>> a
>> > > > leader -> other voters request helps the new leader epoch to be
>> > > propagated
>> > > > faster under a pull model.
>> > > >
>> > > >
>> > > > >    - And about the `FetchQuorumRecords` response schema, in the
>> > > `Records`
>> > > > >    field of the response, is it just one record or all the records
>> > > > starting
>> > > > >    from the FetchOffset? It seems a lot more efficient if we sent
>> all
>> > > the
>> > > > >    records during the bootstrapping of the brokers.
>> > > > >
>> > > >
>> > > > Yes the fetching is batched: FetchOffset is just the starting
>> offset of
>> > > the
>> > > > batch of records.
>> > > >
>> > > >
>> > > > >    - Regarding the disruptive broker issues, does our pull based
>> > model
>> > > > >    suffer from it? If so, have we considered the Pre-Vote stage?
>> If
>> > > not,
>> > > > > why?
>> > > > >
>> > > > >
>> > > > The disruptive broker is stated in the original Raft paper which is
>> the
>> > > > result of the push model design. Our analysis showed that with the
>> pull
>> > > > model it is no longer an issue.
>> > > >
>> > > >
>> > > > > Thanks a lot for putting this up, and I hope that my questions
>> can be
>> > > of
>> > > > > some value to make this KIP better.
>> > > > >
>> > > > > Hope to hear from you soon!
>> > > > >
>> > > > > Best wishes,
>> > > > > Leonard
>> > > > >
>> > > > >
>> > > > > On Wed, Apr 29, 2020 at 1:46 AM Colin McCabe <cmcc...@apache.org>
>> > > wrote:
>> > > > >
>> > > > > > Hi Jason,
>> > > > > >
>> > > > > > It's amazing to see this coming together :)
>> > > > > >
>> > > > > > I haven't had a chance to read in detail, but I read the outline
>> > and
>> > > a
>> > > > > few
>> > > > > > things jumped out at me.
>> > > > > >
>> > > > > > First, for every epoch that is 32 bits rather than 64, I sort of
>> > > wonder
>> > > > > if
>> > > > > > that's a good long-term choice.  I keep reading about stuff like
>> > > this:
>> > > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-1277 .
>> Obviously,
>> > > > that
>> > > > > > JIRA is about zxid, which increments much faster than we expect
>> > these
>> > > > > > leader epochs to, but it would still be good to see some rough
>> > > > > calculations
>> > > > > > about how long 32 bits (or really, 31 bits) will last us in the
>> > cases
>> > > > > where
>> > > > > > we're using it, and what the space savings we're getting really
>> is.
>> > > It
>> > > > > > seems like in most cases the tradeoff may not be worth it?
>> > > > > >
>> > > > > > Another thing I've been thinking about is how we do
>> > bootstrapping.  I
>> > > > > > would prefer to be in a world where formatting a new Kafka node
>> > was a
>> > > > > first
>> > > > > > class operation explicitly initiated by the admin, rather than
>> > > > something
>> > > > > > that happened implicitly when you started up the broker and
>> things
>> > > > > "looked
>> > > > > > blank."
>> > > > > >
>> > > > > > The first problem is that things can "look blank" accidentally
>> if
>> > the
>> > > > > > storage system is having a bad day.  Clearly in the non-Raft
>> world,
>> > > > this
>> > > > > > leads to data loss if the broker that is (re)started this way
>> was
>> > the
>> > > > > > leader for some partitions.
>> > > > > >
>> > > > > > The second problem is that we have a bit of a chicken and egg
>> > problem
>> > > > > with
>> > > > > > certain configuration keys.  For example, maybe you want to
>> > configure
>> > > > > some
>> > > > > > connection security settings in your cluster, but you don't want
>> > them
>> > > > to
>> > > > > > ever be stored in a plaintext config file.  (For example, SCRAM
>> > > > > passwords,
>> > > > > > etc.)  You could use a broker API to set the configuration, but
>> > that
>> > > > > brings
>> > > > > > up the chicken and egg problem.  The broker needs to be
>> configured
>> > to
>> > > > > know
>> > > > > > how to talk to you, but you need to configure it before you can
>> > talk
>> > > to
>> > > > > > it.  Using an external secret manager like Vault is one way to
>> > solve
>> > > > > this,
>> > > > > > but not everyone uses an external secret manager.
>> > > > > >
>> > > > > > quorum.voters seems like a similar configuration key.  In the
>> > current
>> > > > > KIP,
>> > > > > > this is only read if there is no other configuration specifying
>> the
>> > > > > quorum
>> > > > > > voter set.  If we had a kafka.mkfs command, we wouldn't need
>> this
>> > key
>> > > > > > because we could assume that there was always quorum information
>> > > stored
>> > > > > > locally.
>> > > > > >
>> > > > > > best,
>> > > > > > Colin
>> > > > > >
>> > > > > >
>> > > > > > On Thu, Apr 16, 2020, at 16:44, Jason Gustafson wrote:
>> > > > > > > Hi All,
>> > > > > > >
>> > > > > > > I'd like to start a discussion on KIP-595:
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum
>> > > > > > .
>> > > > > > > This proposal specifies a Raft protocol to ultimately replace
>> > > > Zookeeper
>> > > > > > > as
>> > > > > > > documented in KIP-500. Please take a look and share your
>> > thoughts.
>> > > > > > >
>> > > > > > > A few minor notes to set the stage a little bit:
>> > > > > > >
>> > > > > > > - This KIP does not specify the structure of the messages
>> used to
>> > > > > > represent
>> > > > > > > metadata in Kafka, nor does it specify the internal API that
>> will
>> > > be
>> > > > > used
>> > > > > > > by the controller. Expect these to come in later proposals.
>> Here
>> > we
>> > > > are
>> > > > > > > primarily concerned with the replication protocol and basic
>> > > > operational
>> > > > > > > mechanics.
>> > > > > > > - We expect many details to change as we get closer to
>> > integration
>> > > > with
>> > > > > > > the controller. Any changes we make will be made either as
>> > > amendments
>> > > > > to
>> > > > > > > this KIP or, in the case of larger changes, as new proposals.
>> > > > > > > - We have a prototype implementation which I will put online
>> > within
>> > > > the
>> > > > > > > next week which may help in understanding some details. It has
>> > > > > diverged a
>> > > > > > > little bit from our proposal, so I am taking a little time to
>> > bring
>> > > > it
>> > > > > in
>> > > > > > > line. I'll post an update to this thread when it is available
>> for
>> > > > > review.
>> > > > > > >
>> > > > > > > Finally, I want to mention that this proposal was drafted by
>> > > myself,
>> > > > > > Boyang
>> > > > > > > Chen, and Guozhang Wang.
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Jason
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > Leonard Ge
>> > > > > Software Engineer Intern - Confluent
>> > > > >
>> > > >
>> > > >
>> > > > --
>> > > > -- Guozhang
>> > > >
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang

Reply via email to