On Mon, Jul 27, 2020, at 09:20, Jun Rao wrote: > Hi, Colin, > > Thanks for the KIP. A few comments below. >
Hi Jun, Thanks for the review. Sorry that it took me a while to respond. I wanted to carefully re-read the latest version of KIP-595 first as well as the other discussions. > > 10. Some of the choices in this KIP are not consistent with KIP-595. It > would be useful to make consistent choices between the two KIPs. > 10.1 KIP-595 doesn't use a separate Heartbeat request and heartbeat is > piggybacked through the Fetch request. > At present, these two KIPs describe three different heartbeat mechanisms: 1. the KIP-595 mechanism for choosing a new leader when the current one is inaccessible 2. the KIP-631 BrokerHeartbeat mechanism for letting the active controller time out brokers 3. the KIP-631 ControllerHeartbeat mechanism for letting the active controller coordinate snapshots The first one, the timeout mechanism described in KIP-595, only applies to controller nodes. It doesn't say anything about broker nodes. This is because the KIP-595 mechanism describes how voters (controller nodes) choose a new leader (active controller). In contrast, the second one, the BrokerHeartbeat mechanism in KIP-631, describes how the leader (active controller) decides that brokers should be fenced. The third one is about coordinating snapshots. Thinking about this more, I think we should cut the third one from this proposal. We should discuss this in the context of KIP-630, since it's properly part of snapshotting. (Additionally, it probably makes more sense to coordinate snapshotting by using additional fields in the FetchRequest, rather than having another heartbeat.) The first two mechanisms are always going to be separate. Coordinating a leader change within the quorum is simply a different activity than handling broker timeouts. Brokers are not part of the quorum. > > 10.2 The discussion in KIP-595 still assumes a separate controlled shutdown > request instead of heartbeat. > KIP-595 says that controllers should send out EndQuorumEpoch when shutting down gracefully, in order to transition over to a new leader more quickly. But this is different than a broker sending a BrokerHeartbeat to the active controller, asking to be shut down. The broker is not part of the quorum, so it will never send EndQuorumEpoch. Eventually we may implement Raft for regular partitions-- in which case brokers would probably send EndQuorumEpoch when shutting down. But this would be part of the broker shutdown process, rather than a replacement for it. EndQuorumEpoch is purely at the level of Raft, and doesn't imply anything more than "I don't want to be leader of partition X any more." Note that KIP-500 also has some discussion of using heartbeats to handle broker shutdown, although it doesn't include RPC details or anything like that. > > 10.3 My understanding is that the controller is just the leader in the Raft > quorum. If so, do we need process.roles and controller.connect in this KIP > given quorum.voters in KIP-595? > Brokers are not part of the Raft quorum. Only controllers are part of the Raft quorum. (At least at present, until we extend Raft to being used for regular, non-metadata replication) So you need process.roles to choose whether you are starting a controller node or a broker node, or a broker node and a controller node that co-exist in the same JVM. > > 11. Fencing: It would be useful to clarify whether the fencing is 1-way or > 2-way. In ZK, the fencing is 1-way. ZK server determines if a ZK session is > expired or not. An expired ZK client doesn't know it's fenced until it can > connect to ZK server. It seems in this KIP, the proposal is for the fencing > to work in both ways, i.e., the controller can fence a broker and a broker > can fence itself based on heartbeat independently. There are some tradeoffs > between these two approaches. It would be useful to document the benefits > and the limitations of the proposed approach. For example, I wonder what > happens if the controller and the broker make inconsistent fencing > decisions in the new approach. > The intention behind the two-way fencing is to make things less confusing for clients, by minimizing the amount of time during which they can contact brokers which are not part of the cluster and not be aware of that fact. I agree that we cannot rely on the broker fencing itself at the exact same time as the controller fences it. So there will always be the possibility of contacting a broker which is not in the cluster any more, but which doesn't know it. But we can minimize that time so that it is only a few seconds, rather than minutes or hours of "weird behavior." The main thing to avoid is a scenario where the broker thinks it is fenced, but the active controller does not. And I do think we will avoid that in practice. They are both using the same lease timeout. While the clocks may run at slightly different speeds on the different computers due to hardware issues, at very worst it would be something like a difference of less than a millisecond every few seconds. I suppose if you had a broker become inaccessible at the same time as the active controller failed over to a new controller, you could get in a scenario where the active controller took longer to fence the broker than the broker took to fence itself. This should be pretty rare, though. I suppose we could make the broker timeout longer to compensate for things like this (perhaps 2x the timeout on the controller side?) > > 12. BrokerRecord: > 12.1 Currently, BrokerEpoch is the ZK session id. How is BrokerEpoch > generated without ZK? > The active controller stores the broker epoch in the metadata log. It will return that broker epoch to the broker as described in the broker state machine section. > > 12.2 KIP-584 is in progress. So, we need to include the features field. > The features field should be "metadata.format" > > 13. PartitionRecord/IsrChange. IsrChange seems to be representing an > incremental change to ISR in PartitionRecord. For consistency, should we > have a separate record for representing incremental change to replicas? > Currently RemovingReplicas/AddingReplicas are included with many other > fields in PartitionRecord? > I was thinking about this, but I think changing the replica set will be a lot rarer than ISR changes, which happen very often. We really only change the replica set during reassignment. > > 14. "When the active controller decides that a standby controller should > start a snapshot, it will communicate that information in its response to > the periodic heartbeat sent by that node. When the active controller > decides that it itself should create a snapshot, it will first try to give > up the leadership of the Raft quorum in order to avoid unnecessary delays > while writing the snapshot." Is it truly necessary to only do snapshotting > in the follower? It seems it's simpler to just let every replica do > snapshotting in a background thread. > That's a good point. I removed this section and we will now do snapshotting on the active controller as well as on the standbys. I agree that this could be purely time-based and managed locally, to make things simpler. So we should start with that and see if coordinating across the quorum when snapshots are taken is really necessary or not. > > 15. Currently, we store SCRAM hashes and delegation tokens in ZooKeeper. > Should we add records to account for those? > > 16. The description of leaderEpoch says "An epoch that gets incremented > each time we change the ISR." Currently, we only increment leaderEpoch when > the leader changes. > Fixed. > > 17. Metrics > 17.1 "kafka.controller:type=KafkaController,name=MetadataSnapshotLag The > offset delta between the latest stable offset of the metadata topic and the > offset of the last snapshot (or 0 if there are no snapshots)". 0 could be a > valid lag. So using that to represent no snapshots can cause confusion. > Good point. I changed this to "or the last stable offset itself, if there are no snapshots" > > 17.2 kafka.controller:type=KafkaController,name=ControllerRequestsRate: We > already have a rateAndTIme metric per ControllerState. Do we need this new > metric? > OK, let's remove this. > > 18. Do we need a separate DeletePartition record? This could be useful to > represent the successful deletion of a single partition. > We don't really support decreasing the number of partitions in a topic now, right? At the very least, there is no admin API for it. There is Admin#createPartitions but nothing for removing partitions. I guess the workaround is to delete the topic and re-create with your desired smaller number of partitions. > > 19. Do we need brokerEpoch in DeleteBroker? > I'll add it. Even if only for debugging purposes, it's good to have an epoch here so that we can match up broker activations and deactivations. > > 20. controller.id: I had the same feeling as Jason. Requiring the user to > configure a separate controller id for each broker seems to add more > complexity. So, we need a good reason to do that. So far, it seems that's > just for having a unique id when creating the NetworkClient for the > controller. That's internal and there could be other ways to achieve this. > Thanks, > We do not require a separate controller id for each broker. This ties in to the discussion from earlier in this email -- controllers are not brokers. Even in the case where they are co-located, the mental model should be "two separate processes running in the same JVM." Much like running a datadog agent in the same JVM as the broker, they will have separate ports, etc. This is a big shift from how we think about things now, but I think if you think about it, it makes sense. It makes sense to have a separate ID for separate things. best, Colin > > Jun > > > On Thu, Jul 23, 2020 at 11:02 PM Boyang Chen <reluctanthero...@gmail.com> > wrote: > > > Hey Colin, > > > > some more questions I have about the proposal: > > > > 1. We mentioned in the networking section that "The only time when clients > > should contact a controller node directly is when they are debugging system > > issues". But later we didn't talk about how to enable this debug mode, > > could you consider getting a section about that? > > > > 2. “When the active controller decides that a standby controller should > > start a snapshot, it will communicate that information in its response to > > the periodic heartbeat sent by that node.“ In the KIP-595, we provide an > > RPC called `EndQuorumEpoch` which would transfer the leadership role to a > > dedicated successor, do you think we could reuse that method instead of > > piggy-backing on the heartbeat RPC? > > > > 3. The `DeleteBroker` record is listed but not mentioned in details for the > > KIP. Are we going to support removing a broker in runtime, or this record > > is just for the sake of removing an obsolete broker due to heartbeat > > failure? > > > > 4. In the rejected alternatives, we mentioned we don't want to combine > > heartbeats and fetch and listed out the reason was due to extra complexity. > > However, we should also mention some cons caused by this model, for example > > we are doing 2X round trips to maintain a liveness, where as a regular > > follower it should always send out fetch, for sure. If we are combining the > > two, what are the heartbeat request fields we need to populate in the Fetch > > protocol to make it work? Could we piggy-back on the UpdateMetadata RPC to > > propagate the broker state change for listeners separately to the > > controller? I'm not buying either approach here, just hope we could list > > out more reasoning for separating the heartbeat RPC from Fetch, pros and > > cons. > > > > Boyang > > > > On Wed, Jul 15, 2020 at 5:30 PM Colin McCabe <cmcc...@apache.org> wrote: > > > > > On Mon, Jul 13, 2020, at 11:08, Boyang Chen wrote: > > > > Hey Colin, some quick questions, > > > > > > > > 1. I looked around and didn't find a config for broker heartbeat > > > interval, > > > > are we piggy-back on some existing configs? > > > > > > > > > > Good point. I meant to add this, but I forgot. I added > > > registration.heartbeat.interval.ms in the table. > > > > > > > > > > > 2. We only mentioned that the lease time is 10X of the heartbeat > > > interval, > > > > could we also include why we chose this value? > > > > > > > > > > I will add registration.lease.timeout.ms so that this can be set > > > separately from registration.heartbeat.interval.ms. The choice of value > > > is a balance between not timing out brokers too soon, and not keeping > > > unavailable brokers around too long. > > > > > > best, > > > Colin > > > > > > > > > > > On Mon, Jul 13, 2020 at 10:09 AM Jason Gustafson <ja...@confluent.io> > > > wrote: > > > > > > > > > Hi Colin, > > > > > > > > > > Thanks for the proposal. A few initial comments comments/questions > > > below: > > > > > > > > > > 1. I don't follow why we need a separate configuration for > > > > > `controller.listeners`. The current listener configuration already > > > allows > > > > > users to specify multiple listeners, which allows them to define > > > internal > > > > > endpoints that are not exposed to clients. Can you explain what the > > new > > > > > configuration gives us that we don't already have? > > > > > 2. What is the advantage of creating a separate `controller.id` > > > instead of > > > > > just using `broker.id`? > > > > > 3. It sounds like you are imagining a stop-the-world approach to > > > > > snapshotting, which is why we need the controller micromanaging > > > snapshots > > > > > on all followers. Alternatives include fuzzy snapshots which can be > > > done > > > > > concurrently. If this has been rejected, can you add some detail > > about > > > why? > > > > > 4. More of a nit, but should `DeleteBrokerRecord` be > > > > > `ShutdownBrokerRecord`? The broker is just getting removed from ISRs, > > > but > > > > > it would still be present in the replica set (I assume). > > > > > > > > > > Thanks, > > > > > Jason > > > > > > > > > > On Sun, Jul 12, 2020 at 12:24 AM Colin McCabe <cmcc...@apache.org> > > > wrote: > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > That's an interesting idea, but I think it would be best to strive > > > for > > > > > > single metadata events that are complete in themselves, rather than > > > > > trying > > > > > > to do something transactional or EOS-like. For example, we could > > > have a > > > > > > create event that contains all the partitions to be created. > > > > > > > > > > > > best, > > > > > > Colin > > > > > > > > > > > > > > > > > > On Fri, Jul 10, 2020, at 04:12, Unmesh Joshi wrote: > > > > > > > I was thinking that we might need something like multi-operation > > > > > > > <https://issues.apache.org/jira/browse/ZOOKEEPER-965> record in > > > > > > zookeeper > > > > > > > to atomically create topic and partition records when this multi > > > record > > > > > > is > > > > > > > committed. This way metadata will have both the TopicRecord and > > > > > > > PartitionRecord together always, and in no situation we can have > > > > > > > TopicRecord without PartitionRecord. Not sure if there are other > > > > > > situations > > > > > > > where multi-operation is needed. > > > > > > > <https://issues.apache.org/jira/browse/ZOOKEEPER-965> > > > > > > > > > > > > > > Thanks, > > > > > > > Unmesh > > > > > > > > > > > > > > On Fri, Jul 10, 2020 at 11:32 AM Colin McCabe < > > cmcc...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > Yes, once the last stable offset advanced, we would consider > > the > > > > > topic > > > > > > > > creation to be done, and then we could return success to the > > > client. > > > > > > > > > > > > > > > > best, > > > > > > > > Colin > > > > > > > > > > > > > > > > On Thu, Jul 9, 2020, at 19:44, Unmesh Joshi wrote: > > > > > > > > > It still needs HighWaterMark / LastStableOffset to be > > advanced > > > by > > > > > two > > > > > > > > > records? Something like following? > > > > > > > > > > > > > > > > > > > > > > > > > > > | | > > > > > > > > > <------------------ |----------------| HighWaterMark > > > > > > > > > Response |PartitionRecord | > > > > > > > > > | | > > > > > > > > > -----------------| > > > > > > > > > | TopicRecord | > > > > > - > > > > > > > > > | | > > > > > > > > > -------------------> ------------------ Previous > > > HighWaterMark > > > > > > > > > CreateTopic | | > > > > > > > > > | | > > > > > > > > > | | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jul 10, 2020 at 1:30 AM Colin McCabe < > > > cmcc...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > On Thu, Jul 9, 2020, at 04:37, Unmesh Joshi wrote: > > > > > > > > > > > I see that, when a new topic is created, two metadata > > > records, > > > > > a > > > > > > > > > > > TopicRecord (just the name and id of the topic) and a > > > > > > PartitionRecord > > > > > > > > > > (more > > > > > > > > > > > like LeaderAndIsr, with leader id and replica ids for the > > > > > > partition) > > > > > > > > are > > > > > > > > > > > created. > > > > > > > > > > > While creating the topic, log entries for both the > > records > > > need > > > > > > to be > > > > > > > > > > > committed in RAFT core. Will it need something like a > > > > > > > > > > MultiOperationRecord > > > > > > > > > > > in zookeeper. Then, we can have a single log entry with > > > both > > > > > the > > > > > > > > records, > > > > > > > > > > > and the create topic request can be fulfilled atomically > > > when > > > > > > both > > > > > > > > the > > > > > > > > > > > records are committed? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > > > > > Since the active controller is the only node writing to the > > > log, > > > > > > there > > > > > > > > is > > > > > > > > > > no need for any kind of synchronization or access control > > at > > > the > > > > > > log > > > > > > > > level. > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Unmesh > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 8, 2020 at 6:57 AM Ron Dagostino < > > > > > rndg...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > HI Colin. Thanks for the KIP. Here is some feedback > > and > > > > > > various > > > > > > > > > > > > questions. > > > > > > > > > > > > > > > > > > > > > > > > "*Controller processes will listen on a separate port > > > from > > > > > > brokers. > > > > > > > > > > This > > > > > > > > > > > > will be true even when the broker and controller are > > > > > > co-located in > > > > > > > > the > > > > > > > > > > same > > > > > > > > > > > > JVM*". I assume it is possible that the port numbers > > > could be > > > > > > the > > > > > > > > same > > > > > > > > > > when > > > > > > > > > > > > using separate JVMs (i.e. broker uses port 9192 and > > > > > controller > > > > > > also > > > > > > > > > > uses > > > > > > > > > > > > port 9192). I think it would be clearer to state this > > > along > > > > > > these > > > > > > > > > > > > lines: "Controller > > > > > > > > > > > > nodes will listen on a port, and the controller port > > must > > > > > > differ > > > > > > > > from > > > > > > > > > > any > > > > > > > > > > > > port that a broker in the same JVM is listening on. In > > > other > > > > > > > > words, a > > > > > > > > > > > > controller and a broker node, when in the same JVM, do > > > not > > > > > > share > > > > > > > > ports" > > > > > > > > > > > > > > > > > > > > > > > > I think the sentence "*In the realm of ACLs, this > > > translates > > > > > to > > > > > > > > > > controllers > > > > > > > > > > > > requiring CLUSTERACTION on CLUSTER for all operations*" > > > is > > > > > > > > confusing. > > > > > > > > > > It > > > > > > > > > > > > feels to me that you can just delete it. Am I missing > > > > > > something > > > > > > > > here? > > > > > > > > > > > > > > > > > > > > > > > > The KIP states "*The metadata will be stored in memory > > > on all > > > > > > the > > > > > > > > > > active > > > > > > > > > > > > controllers.*" Can there be multiple active > > controllers? > > > > > > Should > > > > > > > > it > > > > > > > > > > > > instead read "The metadata will be stored in memory on > > > all > > > > > > > > potential > > > > > > > > > > > > controllers." (or something like that)? > > > > > > > > > > > > > > > > > > > > > > > > KIP-595 states "*we have assumed the name > > > __cluster_metadata > > > > > > for > > > > > > > > this > > > > > > > > > > > > topic, but this is not a formal part of this > > proposal*". > > > > > This > > > > > > > > KIP-631 > > > > > > > > > > > > states "*Metadata changes need to be persisted to the > > > > > > __metadata > > > > > > > > log > > > > > > > > > > before > > > > > > > > > > > > we propagate them to the other nodes in the cluster. > > > This > > > > > > means > > > > > > > > > > waiting > > > > > > > > > > > > for the metadata log's last stable offset to advance to > > > the > > > > > > offset > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > change.*" Are we here formally defining "__metadata" > > as > > > the > > > > > > topic > > > > > > > > > > name, > > > > > > > > > > > > and should these sentences refer to "__metadata topic" > > > rather > > > > > > than > > > > > > > > > > > > "__metadata log"? What are the "other nodes in the > > > cluster" > > > > > > that > > > > > > > > are > > > > > > > > > > > > referred to? These are not controller nodes but > > brokers, > > > > > > right? > > > > > > > > If > > > > > > > > > > so, > > > > > > > > > > > > then should we say "before we propagate them to the > > > brokers"? > > > > > > > > > > Technically > > > > > > > > > > > > we have a controller cluster and a broker cluster -- > > two > > > > > > separate > > > > > > > > > > clusters, > > > > > > > > > > > > correct? (Even though we could potentially share JVMs > > > and > > > > > > > > therefore > > > > > > > > > > > > require no additional processes.). If the statement is > > > > > > referring to > > > > > > > > > > nodes > > > > > > > > > > > > in both clusters then maybe we should state "before we > > > > > > propagate > > > > > > > > them > > > > > > > > > > to > > > > > > > > > > > > the other nodes in the controller cluster or to > > brokers." > > > > > > > > > > > > > > > > > > > > > > > > "*The controller may have several of these uncommitted > > > > > changes > > > > > > in > > > > > > > > > > flight at > > > > > > > > > > > > any given time. In essence, the controller's in-memory > > > state > > > > > > is > > > > > > > > > > always a > > > > > > > > > > > > little bit in the future compared to the current state. > > > This > > > > > > > > allows > > > > > > > > > > the > > > > > > > > > > > > controller to continue doing things while it waits for > > > the > > > > > > previous > > > > > > > > > > changes > > > > > > > > > > > > to be committed to the Raft log.*" Should the three > > > > > references > > > > > > > > above > > > > > > > > > > be to > > > > > > > > > > > > the active controller rather than just the controller? > > > > > > > > > > > > > > > > > > > > > > > > "*Therefore, the controller must not make this future > > > state > > > > > > > > "visible" > > > > > > > > > > to > > > > > > > > > > > > the rest of the cluster until it has been made > > > persistent – > > > > > > that > > > > > > > > is, > > > > > > > > > > until > > > > > > > > > > > > it becomes current state*". Again I wonder if this > > should > > > > > > refer to > > > > > > > > > > "active" > > > > > > > > > > > > controller, and indicate "anyone else" as opposed to > > "the > > > > > rest > > > > > > of > > > > > > > > the > > > > > > > > > > > > cluster" since we are talking about 2 clusters here? > > > > > > > > > > > > > > > > > > > > > > > > "*When the active controller decides that it itself > > > should > > > > > > create a > > > > > > > > > > > > snapshot, it will first try to give up the leadership > > of > > > the > > > > > > Raft > > > > > > > > > > quorum.*" > > > > > > > > > > > > Why? Is it necessary to state this? It seems like it > > > might > > > > > > be an > > > > > > > > > > > > implementation detail rather than a necessary > > > > > > > > constraint/requirement > > > > > > > > > > that > > > > > > > > > > > > we declare publicly and would have to abide by. > > > > > > > > > > > > > > > > > > > > > > > > "*It will reject brokers whose metadata is too stale*". > > > Why? > > > > > > An > > > > > > > > > > example > > > > > > > > > > > > might be helpful here. > > > > > > > > > > > > > > > > > > > > > > > > "*it may lose subsequent conflicts if its broker epoch > > is > > > > > > stale*" > > > > > > > > This > > > > > > > > > > is > > > > > > > > > > > > the first time a "broker epoch" is mentioned. I am > > > assuming > > > > > > it is > > > > > > > > the > > > > > > > > > > > > controller epoch communicated to it (if any). It would > > > be > > > > > > good to > > > > > > > > > > > > introduce it/explicitly state what it is before > > > referring to > > > > > > it. > > > > > > > > > > > > > > > > > > > > > > > > Ron > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 7, 2020 at 6:48 PM Colin McCabe < > > > > > > cmcc...@apache.org> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > I posted a KIP about how the quorum-based controller > > > > > > envisioned > > > > > > > > in > > > > > > > > > > > > KIP-500 > > > > > > > > > > > > > will work. Please take a look here: > > > > > > > > > > > > > https://cwiki.apache.org/confluence/x/4RV4CQ > > > > > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >