Hello Jun, Thanks for your comments, answering some of them inlined below.
On Thu, Jul 16, 2020 at 3:44 PM Jun Rao <j...@confluent.io> wrote: > Hi, Jason, > > Thanks for the updated KIP. Looks good overall. A few more comments below. > > 101. I still don't see a section on bootstrapping related issues. It would > be useful to document if/how the following is supported. > 101.1 Currently, we support auto broker id generation. Is this supported > for bootstrap brokers? > 101.2 As Colin mentioned, sometimes we may need to load the security > credentials to be broker before it can be connected to. Could you provide a > bit more detail on how this will work? > 101.3 Currently, we use ZK to generate clusterId on a new cluster. With > Raft, how does every broker generate the same clusterId in a distributed > way? > > 200. It would be useful to document if the various special offsets (log > start offset, recovery point, HWM, etc) for the Raft log are stored in the > same existing checkpoint files or not. > 200.1 Since the Raft log flushes every append, does that allow us to > recover from a recovery point within the active segment or do we still need > to scan the full segment including the recovery point? The former can be > tricky since multiple records can fall into the same disk page and a > subsequent flush may corrupt a page with previously flushed records. > I think we would still document the special offsets for the Raft log in the existing checkpoint files. I will update the KIP. I have not thought about optimizing our existing recovery process at the moment; Raft log flushing on every append may open the door for some optimization, but on the other hand we are also considering some ways to defer the every-flush-on-append as well for future works, as suggested in the KIP docs. So I'd say at the moment we will just keep the recovery logic as is. > > 201. Configurations. > 201.1 How do the Raft brokers get security related configs for inter broker > communication? Is that based on the existing > inter.broker.security.protocol? > We have a separate KIP proposal to address broker bootstrapping (actually, that also includes broker reconfiguration) issues, and I believe Jason would publish soon. The main idea is that for security types we would still set it via "security.inter.broker.protocol" and "inter.broker.listener.name", but the configs would not be allowed to be altered while the broker is offline. And for the case when a broker is started for the first time, users would need to set it in "meta.properties" if necessary. > 201.2 We have quorum.retry.backoff.max.ms and quorum.retry.backoff.ms, but > only quorum.election.backoff.max.ms. This seems a bit inconsistent. > > The current implementation does not use `quorum.retry.backoff.max.ms`, since we just used a static backoff logic at the connection layer relying on `quorum.retry.backoff.ms`. Personally I think it is okay to remove the former config at the moment since we do not have a strong motivation to use exponential backoffs. The election procedure indeed uses a binary exponential backoff mechanism, but I think we do not need to have a separate ` quorum.election.backoff.base.ms` or something like that -- currently the base ms is hardcoded -- but I'm open to other thoughts if we believe making it configurable is also important. > 202. Metrics: > 202.1 TotalTimeMs, InboundQueueTimeMs, HandleTimeMs, OutboundQueueTimeMs: > Are those the same as existing totalTime, requestQueueTime, localTime, > responseQueueTime? Could we reuse the existing ones with the tag > request=[request-type]? > Yes we can, I'm going to remove those metrics in the KIP. > 202.2. Could you explain what InboundChannelSize and OutboundChannelSize > are? > Our current implementation uses another channel between the handler to the RaftClient, that is, upon handling a request the thread does not do anything but just put it into a new queue which would then be polled by the RaftClient, and similarly the generated response would be also synced to the handler threads via a queue as well. And these metrics are for measuring this queue size. > 202.3 ElectionLatencyMax/Avg: It seems that both should be windowed? > > You're right, they are implemented as windowed sum / avg, and so are " *ReplicationLatencyMax/Avg*" actually. I will update the KIP. > 203. Quorum State: I assume that LeaderId will be kept consistently with > LeaderEpoch. For example, if a follower transitions to candidate and bumps > up LeaderEpoch, it will set leaderId to -1 and persist both in the Quorum > state file. Is that correct? > > 204. I was thinking about a corner case when a Raft broker is partitioned > off. This broker will then be in a continuous loop of bumping up the leader > epoch, but failing to get enough votes. When the partitioning is removed, > this broker's high leader epoch will force a leader election. I assume > other Raft brokers can immediately advance their leader epoch passing the > already bumped epoch such that leader election won't be delayed. Is that > right? > > 205. In a JBOD setting, could we use the existing tool to move the Raft log > from one disk to another? > > 206. The KIP doesn't mention the local metadata store derived from the Raft > log. Will that be covered in a separate KIP? > > 207. Since this is a critical component. Could we add a section on the > testing plan for correctness? > > 208. Performance. Do we plan to do group commit (e.g. buffer pending > appends during a flush and then flush all accumulated pending records > together in the next flush) for better throughput? > Yes, that is in our discussion at the moment. Buffering pending appends have some implicit impacts on some of the raft state transition diagrams but I think that's doable. I will update the KIP adding a section of "future works" for this idea. > > 209. "the leader can actually defer fsync until it knows "quorum.size - 1" > has get to a certain entry offset." Why is that "quorum.size - 1" instead > of the majority of the quorum? > > Yes I meant to say "majority - 1" (minus one for the leader itself). I will update the KIP. > Thanks, > > Jun > > On Mon, Jul 13, 2020 at 9:43 AM Jason Gustafson <ja...@confluent.io> > wrote: > > > Hi All, > > > > Just a quick update on the proposal. We have decided to move quorum > > reassignment to a separate KIP: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-642%3A+Dynamic+quorum+reassignment > > . > > The way this ties into cluster bootstrapping is complicated, so we felt > we > > needed a bit more time for validation. That leaves the core of this > > proposal as quorum-based replication. If there are no further comments, > we > > will plan to start a vote later this week. > > > > Thanks, > > Jason > > > > On Wed, Jun 24, 2020 at 10:43 AM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > @Jun Rao <jun...@gmail.com> > > > > > > Regarding your comment about log compaction. After some deep-diving > into > > > this we've decided to propose a new snapshot-based log cleaning > mechanism > > > which would be used to replace the current compaction mechanism for > this > > > meta log. A new KIP will be proposed specifically for this idea. > > > > > > All, > > > > > > I've updated the KIP wiki a bit updating one config " > > > election.jitter.max.ms" > > > to "election.backoff.max.ms" to make it more clear about the usage: > the > > > configured value will be the upper bound of the binary exponential > > backoff > > > time after a failed election, before starting a new one. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Fri, Jun 12, 2020 at 9:26 AM Boyang Chen < > reluctanthero...@gmail.com> > > > wrote: > > > > > > > Thanks for the suggestions Guozhang. > > > > > > > > On Thu, Jun 11, 2020 at 2:51 PM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Hello Boyang, > > > > > > > > > > Thanks for the updated information. A few questions here: > > > > > > > > > > 1) Should the quorum-file also update to support multi-raft? > > > > > > > > > > I'm neutral about this, as we don't know yet how the multi-raft > > modules > > > > would behave. If > > > > we have different threads operating different raft groups, > > consolidating > > > > the `checkpoint` files seems > > > > not reasonable. We could always add `multi-quorum-file` later if > > > possible. > > > > > > > > 2) In the previous proposal, there's fields in the FetchQuorumRecords > > > like > > > > > latestDirtyOffset, is that dropped intentionally? > > > > > > > > > > I dropped the latestDirtyOffset since it is associated with the log > > > > compaction discussion. This is beyond this KIP scope and we could > > > > potentially get a separate KIP to talk about it. > > > > > > > > > > > > > 3) I think we also need to elaborate a bit more details regarding > > when > > > to > > > > > send metadata request and discover-brokers; currently we only > > discussed > > > > > during bootstrap how these requests would be sent. I think the > > > following > > > > > scenarios would also need these requests > > > > > > > > > > 3.a) As long as a broker does not know the current quorum > (including > > > the > > > > > leader and the voters), it should continue periodically ask other > > > brokers > > > > > via "metadata. > > > > > 3.b) As long as a broker does not know all the current quorum > voter's > > > > > connections, it should continue periodically ask other brokers via > > > > > "discover-brokers". > > > > > 3.c) When the leader's fetch timeout elapsed, it should send > metadata > > > > > request. > > > > > > > > > > Make sense, will add to the KIP. > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Wed, Jun 10, 2020 at 5:20 PM Boyang Chen < > > > reluctanthero...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hey all, > > > > > > > > > > > > follow-up on the previous email, we made some more updates: > > > > > > > > > > > > 1. The Alter/DescribeQuorum RPCs are also re-structured to use > > > > > multi-raft. > > > > > > > > > > > > 2. We add observer status into the DescribeQuorumResponse as we > see > > > it > > > > > is a > > > > > > low hanging fruit which is very useful for user debugging and > > > > > reassignment. > > > > > > > > > > > > 3. The FindQuorum RPC is replaced with DiscoverBrokers RPC, which > > is > > > > > purely > > > > > > in charge of discovering broker connections in a gossip manner. > The > > > > > quorum > > > > > > leader discovery is piggy-back on the Metadata RPC for the topic > > > > > partition > > > > > > leader, which in our case is the single metadata partition for > the > > > > > version > > > > > > one. > > > > > > > > > > > > Let me know if you have any questions. > > > > > > > > > > > > Boyang > > > > > > > > > > > > On Tue, Jun 9, 2020 at 11:01 PM Boyang Chen < > > > > reluctanthero...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hey all, > > > > > > > > > > > > > > Thanks for the great discussions so far. I'm posting some KIP > > > updates > > > > > > from > > > > > > > our working group discussion: > > > > > > > > > > > > > > 1. We will be changing the core RPCs from single-raft API to > > > > > multi-raft. > > > > > > > This means all protocols will be "batch" in the first version, > > but > > > > the > > > > > > KIP > > > > > > > itself only illustrates the design for a single metadata topic > > > > > partition. > > > > > > > The reason is to "keep the door open" for future extensions of > > this > > > > > piece > > > > > > > of module such as a sharded controller or general quorum based > > > topic > > > > > > > replication, beyond the current Kafka replication protocol. > > > > > > > > > > > > > > 2. We will piggy-back on the current Kafka Fetch API instead of > > > > > inventing > > > > > > > a new FetchQuorumRecords RPC. The motivation is about the same > as > > > #1 > > > > as > > > > > > > well as making the integration work easier, instead of letting > > two > > > > > > similar > > > > > > > RPCs diverge. > > > > > > > > > > > > > > 3. In the EndQuorumEpoch protocol, instead of only sending the > > > > request > > > > > to > > > > > > > the most caught-up voter, we shall broadcast the information to > > all > > > > > > voters, > > > > > > > with a sorted voter list in descending order of their > > corresponding > > > > > > > replicated offset. In this way, the top voter will become a > > > candidate > > > > > > > immediately, while the other voters shall wait for an > exponential > > > > > > back-off > > > > > > > to trigger elections, which helps ensure the top voter gets > > > elected, > > > > > and > > > > > > > the election eventually happens when the top voter is not > > > responsive. > > > > > > > > > > > > > > Please see the updated KIP and post any questions or concerns > on > > > the > > > > > > > mailing thread. > > > > > > > > > > > > > > Boyang > > > > > > > > > > > > > > On Fri, May 8, 2020 at 5:26 PM Jun Rao <j...@confluent.io> > wrote: > > > > > > > > > > > > > >> Hi, Guozhang and Jason, > > > > > > >> > > > > > > >> Thanks for the reply. A couple of more replies. > > > > > > >> > > > > > > >> 102. Still not sure about this. How is the tombstone issue > > > addressed > > > > > in > > > > > > >> the > > > > > > >> non-voter and the observer. They can die at any point and > > restart > > > > at > > > > > an > > > > > > >> arbitrary later time, and the advancing of the firstDirty > offset > > > and > > > > > the > > > > > > >> removal of the tombstone can happen independently. > > > > > > >> > > > > > > >> 106. I agree that it would be less confusing if we used > "epoch" > > > > > instead > > > > > > of > > > > > > >> "leader epoch" consistently. > > > > > > >> > > > > > > >> Jun > > > > > > >> > > > > > > >> On Thu, May 7, 2020 at 4:04 PM Guozhang Wang < > > wangg...@gmail.com> > > > > > > wrote: > > > > > > >> > > > > > > >> > Thanks Jun. Further replies are in-lined. > > > > > > >> > > > > > > > >> > On Mon, May 4, 2020 at 11:58 AM Jun Rao <j...@confluent.io> > > > wrote: > > > > > > >> > > > > > > > >> > > Hi, Guozhang, > > > > > > >> > > > > > > > > >> > > Thanks for the reply. A few more replies inlined below. > > > > > > >> > > > > > > > > >> > > On Sun, May 3, 2020 at 6:33 PM Guozhang Wang < > > > > wangg...@gmail.com> > > > > > > >> wrote: > > > > > > >> > > > > > > > > >> > > > Hello Jun, > > > > > > >> > > > > > > > > > >> > > > Thanks for your comments! I'm replying inline below: > > > > > > >> > > > > > > > > > >> > > > On Fri, May 1, 2020 at 12:36 PM Jun Rao < > j...@confluent.io > > > > > > > > wrote: > > > > > > >> > > > > > > > > > >> > > > > 101. Bootstrapping related issues. > > > > > > >> > > > > 101.1 Currently, we support auto broker id generation. > > Is > > > > this > > > > > > >> > > supported > > > > > > >> > > > > for bootstrap brokers? > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > The vote ids would just be the broker ids. > > > "bootstrap.servers" > > > > > > >> would be > > > > > > >> > > > similar to what client configs have today, where > > > > "quorum.voters" > > > > > > >> would > > > > > > >> > be > > > > > > >> > > > pre-defined config values. > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > My question was on the auto generated broker id. > Currently, > > > the > > > > > > broker > > > > > > >> > can > > > > > > >> > > choose to have its broker Id auto generated. The > generation > > is > > > > > done > > > > > > >> > through > > > > > > >> > > ZK to guarantee uniqueness. Without ZK, it's not clear how > > the > > > > > > broker > > > > > > >> id > > > > > > >> > is > > > > > > >> > > auto generated. "quorum.voters" also can't be set > statically > > > if > > > > > > broker > > > > > > >> > ids > > > > > > >> > > are auto generated. > > > > > > >> > > > > > > > > >> > > Jason has explained some ideas that we've discussed so > far, > > > the > > > > > > >> reason we > > > > > > >> > intentional did not include them so far is that we feel it > is > > > > > out-side > > > > > > >> the > > > > > > >> > scope of KIP-595. Under the umbrella of KIP-500 we should > > > > definitely > > > > > > >> > address them though. > > > > > > >> > > > > > > > >> > On the high-level, our belief is that "joining a quorum" and > > > > > "joining > > > > > > >> (or > > > > > > >> > more specifically, registering brokers in) the cluster" > would > > be > > > > > > >> > de-coupled a bit, where the former should be completed > before > > we > > > > do > > > > > > the > > > > > > >> > latter. More specifically, assuming the quorum is already up > > and > > > > > > >> running, > > > > > > >> > after the newly started broker found the leader of the > quorum > > it > > > > can > > > > > > >> send a > > > > > > >> > specific RegisterBroker request including its listener / > > > protocol > > > > / > > > > > > etc, > > > > > > >> > and upon handling it the leader can send back the uniquely > > > > generated > > > > > > >> broker > > > > > > >> > id to the new broker, while also executing the > > "startNewBroker" > > > > > > >> callback as > > > > > > >> > the controller. > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > > 102. Log compaction. One weak spot of log compaction > is > > > for > > > > > the > > > > > > >> > > consumer > > > > > > >> > > > to > > > > > > >> > > > > deal with deletes. When a key is deleted, it's > retained > > > as a > > > > > > >> > tombstone > > > > > > >> > > > > first and then physically removed. If a client misses > > the > > > > > > >> tombstone > > > > > > >> > > > > (because it's physically removed), it may not be able > to > > > > > update > > > > > > >> its > > > > > > >> > > > > metadata properly. The way we solve this in Kafka is > > based > > > > on > > > > > a > > > > > > >> > > > > configuration (log.cleaner.delete.retention.ms) and > we > > > > > expect a > > > > > > >> > > consumer > > > > > > >> > > > > having seen an old key to finish reading the deletion > > > > > tombstone > > > > > > >> > within > > > > > > >> > > > that > > > > > > >> > > > > time. There is no strong guarantee for that since a > > broker > > > > > could > > > > > > >> be > > > > > > >> > > down > > > > > > >> > > > > for a long time. It would be better if we can have a > > more > > > > > > reliable > > > > > > >> > way > > > > > > >> > > of > > > > > > >> > > > > dealing with deletes. > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > We propose to capture this in the "FirstDirtyOffset" > field > > > of > > > > > the > > > > > > >> > quorum > > > > > > >> > > > record fetch response: the offset is the maximum offset > > that > > > > log > > > > > > >> > > compaction > > > > > > >> > > > has reached up to. If the follower has fetched beyond > this > > > > > offset > > > > > > it > > > > > > >> > > means > > > > > > >> > > > itself is safe hence it has seen all records up to that > > > > offset. > > > > > On > > > > > > >> > > getting > > > > > > >> > > > the response, the follower can then decide if its end > > offset > > > > > > >> actually > > > > > > >> > > below > > > > > > >> > > > that dirty offset (and hence may miss some tombstones). > If > > > > > that's > > > > > > >> the > > > > > > >> > > case: > > > > > > >> > > > > > > > > > >> > > > 1) Naively, it could re-bootstrap metadata log from the > > very > > > > > > >> beginning > > > > > > >> > to > > > > > > >> > > > catch up. > > > > > > >> > > > 2) During that time, it would refrain itself from > > answering > > > > > > >> > > MetadataRequest > > > > > > >> > > > from any clients. > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > I am not sure that the "FirstDirtyOffset" field fully > > > addresses > > > > > the > > > > > > >> > issue. > > > > > > >> > > Currently, the deletion tombstone is not removed > immediately > > > > > after a > > > > > > >> > round > > > > > > >> > > of cleaning. It's removed after a delay in a subsequent > > round > > > of > > > > > > >> > cleaning. > > > > > > >> > > Consider an example where a key insertion is at offset 200 > > > and a > > > > > > >> deletion > > > > > > >> > > tombstone of the key is at 400. Initially, > FirstDirtyOffset > > is > > > > at > > > > > > >> 300. A > > > > > > >> > > follower/observer fetches from offset 0 and fetches the > key > > > at > > > > > > offset > > > > > > >> > 200. > > > > > > >> > > A few rounds of cleaning happen. FirstDirtyOffset is at > 500 > > > and > > > > > the > > > > > > >> > > tombstone at 400 is physically removed. The > > follower/observer > > > > > > >> continues > > > > > > >> > the > > > > > > >> > > fetch, but misses offset 400. It catches all the way to > > > > > > >> FirstDirtyOffset > > > > > > >> > > and declares its metadata as ready. However, its metadata > > > could > > > > be > > > > > > >> stale > > > > > > >> > > since it actually misses the deletion of the key. > > > > > > >> > > > > > > > > >> > > Yeah good question, I should have put more details in my > > > > > explanation > > > > > > >> :) > > > > > > >> > > > > > > > >> > The idea is that we will adjust the log compaction for this > > raft > > > > > based > > > > > > >> > metadata log: before more details to be explained, since we > > have > > > > two > > > > > > >> types > > > > > > >> > of "watermarks" here, whereas in Kafka the watermark > indicates > > > > where > > > > > > >> every > > > > > > >> > replica have replicated up to and in Raft the watermark > > > indicates > > > > > > where > > > > > > >> the > > > > > > >> > majority of replicas (here only indicating voters of the > > quorum, > > > > not > > > > > > >> > counting observers) have replicated up to, let's call them > > Kafka > > > > > > >> watermark > > > > > > >> > and Raft watermark. For this special log, we would maintain > > both > > > > > > >> > watermarks. > > > > > > >> > > > > > > > >> > When log compacting on the leader, we would only compact up > to > > > the > > > > > > Kafka > > > > > > >> > watermark, i.e. if there is at least one voter who have not > > > > > replicated > > > > > > >> an > > > > > > >> > entry, it would not be compacted. The "dirty-offset" is the > > > offset > > > > > > that > > > > > > >> > we've compacted up to and is communicated to other voters, > and > > > the > > > > > > other > > > > > > >> > voters would also compact up to this value --- i.e. the > > > difference > > > > > > here > > > > > > >> is > > > > > > >> > that instead of letting each replica doing log compaction > > > > > > independently, > > > > > > >> > we'll have the leader to decide upon which offset to compact > > to, > > > > and > > > > > > >> > propagate this value to others to follow, in a more > > coordinated > > > > > > manner. > > > > > > >> > Also note when there are new voters joining the quorum who > has > > > not > > > > > > >> > replicated up to the dirty-offset, of because of other > issues > > > they > > > > > > >> > truncated their logs to below the dirty-offset, they'd have > to > > > > > > >> re-bootstrap > > > > > > >> > from the beginning, and during this period of time the > leader > > > > > learned > > > > > > >> about > > > > > > >> > this lagging voter would not advance the watermark (also it > > > would > > > > > not > > > > > > >> > decrement it), and hence not compacting either, until the > > > voter(s) > > > > > has > > > > > > >> > caught up to that dirty-offset. > > > > > > >> > > > > > > > >> > So back to your example above, before the bootstrap voter > gets > > > to > > > > > 300 > > > > > > no > > > > > > >> > log compaction would happen on the leader; and until later > > when > > > > the > > > > > > >> voter > > > > > > >> > have got to beyond 400 and hence replicated that tombstone, > > the > > > > log > > > > > > >> > compaction would possibly get to that tombstone and remove > it. > > > Say > > > > > > >> later it > > > > > > >> > the leader's log compaction reaches 500, it can send this > back > > > to > > > > > the > > > > > > >> voter > > > > > > >> > who can then also compact locally up to 500. > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > 105. Quorum State: In addition to VotedId, do we need > > the > > > > > epoch > > > > > > >> > > > > corresponding to VotedId? Over time, the same broker > Id > > > > could > > > > > be > > > > > > >> > voted > > > > > > >> > > in > > > > > > >> > > > > different generations with different epoch. > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > Hmm, this is a good point. Originally I think the > > > > "LeaderEpoch" > > > > > > >> field > > > > > > >> > in > > > > > > >> > > > that file is corresponding to the "latest known leader > > > epoch", > > > > > not > > > > > > >> the > > > > > > >> > > > "current leader epoch". For example, if the current > epoch > > is > > > > N, > > > > > > and > > > > > > >> > then > > > > > > >> > > a > > > > > > >> > > > vote-request with epoch N+1 is received and the voter > > > granted > > > > > the > > > > > > >> vote > > > > > > >> > > for > > > > > > >> > > > it, then it means for this voter it knows the "latest > > epoch" > > > > is > > > > > N > > > > > > + > > > > > > >> 1 > > > > > > >> > > > although it is unknown if that sending candidate will > > indeed > > > > > > become > > > > > > >> the > > > > > > >> > > new > > > > > > >> > > > leader (which would only be notified via begin-quorum > > > > request). > > > > > > >> > However, > > > > > > >> > > > when persisting the quorum state, we would encode > > > leader-epoch > > > > > to > > > > > > >> N+1, > > > > > > >> > > > while the leaderId to be the older leader. > > > > > > >> > > > > > > > > > >> > > > But now thinking about this a bit more, I feel we should > > use > > > > two > > > > > > >> > separate > > > > > > >> > > > epochs, one for the "lates known" and one for the > > "current" > > > to > > > > > > pair > > > > > > >> > with > > > > > > >> > > > the leaderId. I will update the wiki page. > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > Hmm, it's kind of weird to bump up the leader epoch before > > the > > > > new > > > > > > >> leader > > > > > > >> > > is actually elected, right. > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > 106. "OFFSET_OUT_OF_RANGE: Used in the > > FetchQuorumRecords > > > > API > > > > > to > > > > > > >> > > indicate > > > > > > >> > > > > that the follower has fetched from an invalid offset > and > > > > > should > > > > > > >> > > truncate > > > > > > >> > > > to > > > > > > >> > > > > the offset/epoch indicated in the response." Observers > > > can't > > > > > > >> truncate > > > > > > >> > > > their > > > > > > >> > > > > logs. What should they do with OFFSET_OUT_OF_RANGE? > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > I'm not sure if I understand your question? Observers > > should > > > > > still > > > > > > >> be > > > > > > >> > > able > > > > > > >> > > > to truncate their logs as well. > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > Hmm, I thought only the quorum nodes have local logs and > > > > observers > > > > > > >> don't? > > > > > > >> > > > > > > > > >> > > > 107. "The leader will continue sending BeginQuorumEpoch > to > > > > each > > > > > > >> known > > > > > > >> > > > voter > > > > > > >> > > > > until it has received its endorsement." If a voter is > > down > > > > > for a > > > > > > >> long > > > > > > >> > > > time, > > > > > > >> > > > > sending BeginQuorumEpoch seems to add unnecessary > > > overhead. > > > > > > >> > Similarly, > > > > > > >> > > > if a > > > > > > >> > > > > follower stops sending FetchQuorumRecords, does the > > leader > > > > > keep > > > > > > >> > sending > > > > > > >> > > > > BeginQuorumEpoch? > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > Regarding BeginQuorumEpoch: that is a good point. The > > > > > > >> > begin-quorum-epoch > > > > > > >> > > > request is for voters to quickly get the new leader > > > > information; > > > > > > >> > however > > > > > > >> > > > even if they do not get them they can still eventually > > learn > > > > > about > > > > > > >> that > > > > > > >> > > > from others via gossiping FindQuorum. I think we can > > adjust > > > > the > > > > > > >> logic > > > > > > >> > to > > > > > > >> > > > e.g. exponential back-off or with a limited num.retries. > > > > > > >> > > > > > > > > > >> > > > Regarding FetchQuorumRecords: if the follower sends > > > > > > >> FetchQuorumRecords > > > > > > >> > > > already, it means that follower already knows that the > > > broker > > > > is > > > > > > the > > > > > > >> > > > leader, and hence we can stop retrying BeginQuorumEpoch; > > > > however > > > > > > it > > > > > > >> is > > > > > > >> > > > possible that after a follower sends FetchQuorumRecords > > > > already, > > > > > > >> > suddenly > > > > > > >> > > > it stops send it (possibly because it learned about a > > higher > > > > > epoch > > > > > > >> > > leader), > > > > > > >> > > > and hence this broker may be a "zombie" leader and we > > > propose > > > > to > > > > > > use > > > > > > >> > the > > > > > > >> > > > fetch.timeout to let the leader to try to verify if it > has > > > > > already > > > > > > >> been > > > > > > >> > > > stale. > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > It just seems that we should handle these two cases in a > > > > > consistent > > > > > > >> way? > > > > > > >> > > > > > > > > >> > > Yes I agree, on the leader's side, the FetchQuorumRecords > > > from a > > > > > > >> follower > > > > > > >> > could mean that we no longer needs to send BeginQuorumEpoch > > > > anymore > > > > > > --- > > > > > > >> and > > > > > > >> > it is already part of our current implementations in > > > > > > >> > https://github.com/confluentinc/kafka/commits/kafka-raft > > > > > > >> > > > > > > > >> > > > > > > > >> > > Thanks, > > > > > > >> > > > > > > > > >> > > Jun > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > Jun > > > > > > >> > > > > > > > > > > >> > > > > On Wed, Apr 29, 2020 at 8:56 PM Guozhang Wang < > > > > > > wangg...@gmail.com > > > > > > >> > > > > > > > >> > > > wrote: > > > > > > >> > > > > > > > > > > >> > > > > > Hello Leonard, > > > > > > >> > > > > > > > > > > > >> > > > > > Thanks for your comments, I'm relying in line below: > > > > > > >> > > > > > > > > > > > >> > > > > > On Wed, Apr 29, 2020 at 1:58 AM Wang (Leonard) Ge < > > > > > > >> > w...@confluent.io> > > > > > > >> > > > > > wrote: > > > > > > >> > > > > > > > > > > > >> > > > > > > Hi Kafka developers, > > > > > > >> > > > > > > > > > > > > >> > > > > > > It's great to see this proposal and it took me > some > > > time > > > > > to > > > > > > >> > finish > > > > > > >> > > > > > reading > > > > > > >> > > > > > > it. > > > > > > >> > > > > > > > > > > > > >> > > > > > > And I have the following questions about the > > Proposal: > > > > > > >> > > > > > > > > > > > > >> > > > > > > - How do we plan to test this design to ensure > > its > > > > > > >> > correctness? > > > > > > >> > > Or > > > > > > >> > > > > > more > > > > > > >> > > > > > > broadly, how do we ensure that our new ‘pull’ > > based > > > > > model > > > > > > >> is > > > > > > >> > > > > > functional > > > > > > >> > > > > > > and > > > > > > >> > > > > > > correct given that it is different from the > > > original > > > > > RAFT > > > > > > >> > > > > > implementation > > > > > > >> > > > > > > which has formal proof of correctness? > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > We have two planned verifications on the correctness > > and > > > > > > >> liveness > > > > > > >> > of > > > > > > >> > > > the > > > > > > >> > > > > > design. One is via model verification (TLA+) > > > > > > >> > > > > > https://github.com/guozhangwang/kafka-specification > > > > > > >> > > > > > > > > > > > >> > > > > > Another is via the concurrent simulation tests > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/confluentinc/kafka/commit/5c0c054597d2d9f458cad0cad846b0671efa2d91 > > > > > > >> > > > > > > > > > > > >> > > > > > - Have we considered any sensible defaults for > the > > > > > > >> > configuration, > > > > > > >> > > > i.e. > > > > > > >> > > > > > > all the election timeout, fetch time out, etc.? > > Or > > > we > > > > > > want > > > > > > >> to > > > > > > >> > > > leave > > > > > > >> > > > > > > this to > > > > > > >> > > > > > > a later stage when we do the performance > testing, > > > > etc. > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > This is a good question, the reason we did not set > any > > > > > default > > > > > > >> > values > > > > > > >> > > > for > > > > > > >> > > > > > the timeout configurations is that we think it may > > take > > > > some > > > > > > >> > > > benchmarking > > > > > > >> > > > > > experiments to get these defaults right. Some > > high-level > > > > > > >> principles > > > > > > >> > > to > > > > > > >> > > > > > consider: 1) the fetch.timeout should be around the > > same > > > > > scale > > > > > > >> with > > > > > > >> > > zk > > > > > > >> > > > > > session timeout, which is now 18 seconds by default > -- > > > in > > > > > > >> practice > > > > > > >> > > > we've > > > > > > >> > > > > > seen unstable networks having more than 10 secs of > > > > transient > > > > > > >> > > > > connectivity, > > > > > > >> > > > > > 2) the election.timeout, however, should be smaller > > than > > > > the > > > > > > >> fetch > > > > > > >> > > > > timeout > > > > > > >> > > > > > as is also suggested as a practical optimization in > > > > > > literature: > > > > > > >> > > > > > > > > > > https://www.cl.cam.ac.uk/~ms705/pub/papers/2015-osr-raft.pdf > > > > > > >> > > > > > > > > > > > >> > > > > > Some more discussions can be found here: > > > > > > >> > > > > > > > > > > > https://github.com/confluentinc/kafka/pull/301/files#r415420081 > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > - Have we considered piggybacking > > > `BeginQuorumEpoch` > > > > > with > > > > > > >> the > > > > > > >> > ` > > > > > > >> > > > > > > FetchQuorumRecords`? I might be missing > something > > > > > obvious > > > > > > >> but > > > > > > >> > I > > > > > > >> > > am > > > > > > >> > > > > > just > > > > > > >> > > > > > > wondering why don’t we just use the > `FindQuorum` > > > and > > > > > > >> > > > > > > `FetchQuorumRecords` > > > > > > >> > > > > > > APIs and remove the `BeginQuorumEpoch` API? > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > Note that Begin/EndQuorumEpoch is sent from leader > -> > > > > other > > > > > > >> voter > > > > > > >> > > > > > followers, while FindQuorum / Fetch are sent from > > > follower > > > > > to > > > > > > >> > leader. > > > > > > >> > > > > > Arguably one can eventually realize the new leader > and > > > > epoch > > > > > > via > > > > > > >> > > > > gossiping > > > > > > >> > > > > > FindQuorum, but that could in practice require a > long > > > > delay. > > > > > > >> > Having a > > > > > > >> > > > > > leader -> other voters request helps the new leader > > > epoch > > > > to > > > > > > be > > > > > > >> > > > > propagated > > > > > > >> > > > > > faster under a pull model. > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > - And about the `FetchQuorumRecords` response > > > schema, > > > > > in > > > > > > >> the > > > > > > >> > > > > `Records` > > > > > > >> > > > > > > field of the response, is it just one record or > > all > > > > the > > > > > > >> > records > > > > > > >> > > > > > starting > > > > > > >> > > > > > > from the FetchOffset? It seems a lot more > > efficient > > > > if > > > > > we > > > > > > >> sent > > > > > > >> > > all > > > > > > >> > > > > the > > > > > > >> > > > > > > records during the bootstrapping of the > brokers. > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > Yes the fetching is batched: FetchOffset is just the > > > > > starting > > > > > > >> > offset > > > > > > >> > > of > > > > > > >> > > > > the > > > > > > >> > > > > > batch of records. > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > - Regarding the disruptive broker issues, does > > our > > > > pull > > > > > > >> based > > > > > > >> > > > model > > > > > > >> > > > > > > suffer from it? If so, have we considered the > > > > Pre-Vote > > > > > > >> stage? > > > > > > >> > If > > > > > > >> > > > > not, > > > > > > >> > > > > > > why? > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > The disruptive broker is stated in the original Raft > > > paper > > > > > > >> which is > > > > > > >> > > the > > > > > > >> > > > > > result of the push model design. Our analysis showed > > > that > > > > > with > > > > > > >> the > > > > > > >> > > pull > > > > > > >> > > > > > model it is no longer an issue. > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > Thanks a lot for putting this up, and I hope that > my > > > > > > questions > > > > > > >> > can > > > > > > >> > > be > > > > > > >> > > > > of > > > > > > >> > > > > > > some value to make this KIP better. > > > > > > >> > > > > > > > > > > > > >> > > > > > > Hope to hear from you soon! > > > > > > >> > > > > > > > > > > > > >> > > > > > > Best wishes, > > > > > > >> > > > > > > Leonard > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > On Wed, Apr 29, 2020 at 1:46 AM Colin McCabe < > > > > > > >> cmcc...@apache.org > > > > > > >> > > > > > > > > >> > > > > wrote: > > > > > > >> > > > > > > > > > > > > >> > > > > > > > Hi Jason, > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > It's amazing to see this coming together :) > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > I haven't had a chance to read in detail, but I > > read > > > > the > > > > > > >> > outline > > > > > > >> > > > and > > > > > > >> > > > > a > > > > > > >> > > > > > > few > > > > > > >> > > > > > > > things jumped out at me. > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > First, for every epoch that is 32 bits rather > than > > > > 64, I > > > > > > >> sort > > > > > > >> > of > > > > > > >> > > > > wonder > > > > > > >> > > > > > > if > > > > > > >> > > > > > > > that's a good long-term choice. I keep reading > > > about > > > > > > stuff > > > > > > >> > like > > > > > > >> > > > > this: > > > > > > >> > > > > > > > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-1277 > > > > . > > > > > > >> > > Obviously, > > > > > > >> > > > > > that > > > > > > >> > > > > > > > JIRA is about zxid, which increments much faster > > > than > > > > we > > > > > > >> expect > > > > > > >> > > > these > > > > > > >> > > > > > > > leader epochs to, but it would still be good to > > see > > > > some > > > > > > >> rough > > > > > > >> > > > > > > calculations > > > > > > >> > > > > > > > about how long 32 bits (or really, 31 bits) will > > > last > > > > us > > > > > > in > > > > > > >> the > > > > > > >> > > > cases > > > > > > >> > > > > > > where > > > > > > >> > > > > > > > we're using it, and what the space savings we're > > > > getting > > > > > > >> really > > > > > > >> > > is. > > > > > > >> > > > > It > > > > > > >> > > > > > > > seems like in most cases the tradeoff may not be > > > worth > > > > > it? > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > Another thing I've been thinking about is how we > > do > > > > > > >> > > > bootstrapping. I > > > > > > >> > > > > > > > would prefer to be in a world where formatting a > > new > > > > > Kafka > > > > > > >> node > > > > > > >> > > > was a > > > > > > >> > > > > > > first > > > > > > >> > > > > > > > class operation explicitly initiated by the > admin, > > > > > rather > > > > > > >> than > > > > > > >> > > > > > something > > > > > > >> > > > > > > > that happened implicitly when you started up the > > > > broker > > > > > > and > > > > > > >> > > things > > > > > > >> > > > > > > "looked > > > > > > >> > > > > > > > blank." > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > The first problem is that things can "look > blank" > > > > > > >> accidentally > > > > > > >> > if > > > > > > >> > > > the > > > > > > >> > > > > > > > storage system is having a bad day. Clearly in > > the > > > > > > non-Raft > > > > > > >> > > world, > > > > > > >> > > > > > this > > > > > > >> > > > > > > > leads to data loss if the broker that is > > (re)started > > > > > this > > > > > > >> way > > > > > > >> > was > > > > > > >> > > > the > > > > > > >> > > > > > > > leader for some partitions. > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > The second problem is that we have a bit of a > > > chicken > > > > > and > > > > > > >> egg > > > > > > >> > > > problem > > > > > > >> > > > > > > with > > > > > > >> > > > > > > > certain configuration keys. For example, maybe > > you > > > > want > > > > > > to > > > > > > >> > > > configure > > > > > > >> > > > > > > some > > > > > > >> > > > > > > > connection security settings in your cluster, > but > > > you > > > > > > don't > > > > > > >> > want > > > > > > >> > > > them > > > > > > >> > > > > > to > > > > > > >> > > > > > > > ever be stored in a plaintext config file. (For > > > > > example, > > > > > > >> SCRAM > > > > > > >> > > > > > > passwords, > > > > > > >> > > > > > > > etc.) You could use a broker API to set the > > > > > > configuration, > > > > > > >> but > > > > > > >> > > > that > > > > > > >> > > > > > > brings > > > > > > >> > > > > > > > up the chicken and egg problem. The broker > needs > > to > > > > be > > > > > > >> > > configured > > > > > > >> > > > to > > > > > > >> > > > > > > know > > > > > > >> > > > > > > > how to talk to you, but you need to configure it > > > > before > > > > > > you > > > > > > >> can > > > > > > >> > > > talk > > > > > > >> > > > > to > > > > > > >> > > > > > > > it. Using an external secret manager like Vault > > is > > > > one > > > > > > way > > > > > > >> to > > > > > > >> > > > solve > > > > > > >> > > > > > > this, > > > > > > >> > > > > > > > but not everyone uses an external secret > manager. > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > quorum.voters seems like a similar configuration > > > key. > > > > > In > > > > > > >> the > > > > > > >> > > > current > > > > > > >> > > > > > > KIP, > > > > > > >> > > > > > > > this is only read if there is no other > > configuration > > > > > > >> specifying > > > > > > >> > > the > > > > > > >> > > > > > > quorum > > > > > > >> > > > > > > > voter set. If we had a kafka.mkfs command, we > > > > wouldn't > > > > > > need > > > > > > >> > this > > > > > > >> > > > key > > > > > > >> > > > > > > > because we could assume that there was always > > quorum > > > > > > >> > information > > > > > > >> > > > > stored > > > > > > >> > > > > > > > locally. > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > best, > > > > > > >> > > > > > > > Colin > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > On Thu, Apr 16, 2020, at 16:44, Jason Gustafson > > > wrote: > > > > > > >> > > > > > > > > Hi All, > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > I'd like to start a discussion on KIP-595: > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum > > > > > > >> > > > > > > > . > > > > > > >> > > > > > > > > This proposal specifies a Raft protocol to > > > > ultimately > > > > > > >> replace > > > > > > >> > > > > > Zookeeper > > > > > > >> > > > > > > > > as > > > > > > >> > > > > > > > > documented in KIP-500. Please take a look and > > > share > > > > > your > > > > > > >> > > > thoughts. > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > A few minor notes to set the stage a little > bit: > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > - This KIP does not specify the structure of > the > > > > > > messages > > > > > > >> > used > > > > > > >> > > to > > > > > > >> > > > > > > > represent > > > > > > >> > > > > > > > > metadata in Kafka, nor does it specify the > > > internal > > > > > API > > > > > > >> that > > > > > > >> > > will > > > > > > >> > > > > be > > > > > > >> > > > > > > used > > > > > > >> > > > > > > > > by the controller. Expect these to come in > later > > > > > > >> proposals. > > > > > > >> > > Here > > > > > > >> > > > we > > > > > > >> > > > > > are > > > > > > >> > > > > > > > > primarily concerned with the replication > > protocol > > > > and > > > > > > >> basic > > > > > > >> > > > > > operational > > > > > > >> > > > > > > > > mechanics. > > > > > > >> > > > > > > > > - We expect many details to change as we get > > > closer > > > > to > > > > > > >> > > > integration > > > > > > >> > > > > > with > > > > > > >> > > > > > > > > the controller. Any changes we make will be > made > > > > > either > > > > > > as > > > > > > >> > > > > amendments > > > > > > >> > > > > > > to > > > > > > >> > > > > > > > > this KIP or, in the case of larger changes, as > > new > > > > > > >> proposals. > > > > > > >> > > > > > > > > - We have a prototype implementation which I > > will > > > > put > > > > > > >> online > > > > > > >> > > > within > > > > > > >> > > > > > the > > > > > > >> > > > > > > > > next week which may help in understanding some > > > > > details. > > > > > > It > > > > > > >> > has > > > > > > >> > > > > > > diverged a > > > > > > >> > > > > > > > > little bit from our proposal, so I am taking a > > > > little > > > > > > >> time to > > > > > > >> > > > bring > > > > > > >> > > > > > it > > > > > > >> > > > > > > in > > > > > > >> > > > > > > > > line. I'll post an update to this thread when > it > > > is > > > > > > >> available > > > > > > >> > > for > > > > > > >> > > > > > > review. > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > Finally, I want to mention that this proposal > > was > > > > > > drafted > > > > > > >> by > > > > > > >> > > > > myself, > > > > > > >> > > > > > > > Boyang > > > > > > >> > > > > > > > > Chen, and Guozhang Wang. > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > Thanks, > > > > > > >> > > > > > > > > Jason > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > -- > > > > > > >> > > > > > > Leonard Ge > > > > > > >> > > > > > > Software Engineer Intern - Confluent > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > -- > > > > > > >> > > > > > -- Guozhang > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > -- > > > > > > >> > > > -- Guozhang > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > -- > > > > > > >> > -- Guozhang > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > -- -- Guozhang