>>Yes, I agree that the lease timeout on the controller side should be reset.The alternative would be to track the lease as hard state rather than soft state, but I think that is not really >> >> needed, and would result in more log entries. On the related note, I think it will be good to add some details on how leases are maintained in case of active controller (raft leader) failure. e.g. Attached a few diagrams considering leases are maintained with log entries.
On Mon, Aug 31, 2020 at 6:28 PM Unmesh Joshi <unmeshjo...@gmail.com> wrote: > >>The reason for including LeaseStartTimeMs in the request is to ensure > that the time required to communicate with the controller gets included in > >>the lease time. Since requests can potentially be delayed in the network > for a long time, this is important. > The network time will be added anyway, because the lease timer on the > active controller will start only after the heartbeat request reaches the > server. And I think, some assumption about network round trip time is > needed anyway to decide on the frequency of the heartbeat ( > registration.heartbeat.interval.ms), and lease timeout ( > registration.lease.timeout.ms). So I think just having a leaseTTL in the > request is easier to understand and implement. > >>>Yes, I agree that the lease timeout on the controller side should be > reset in the case of controller failover. The alternative would be to > track the >>>lease as hard state rather than soft state, but I think that > is not really needed, and would result in more log entries. > My interpretation of the mention of BrokerRecord in the KIP was that this > record exists in the Raft log. By soft state, do you mean the broker > records exist only on the active leader and will not be replicated in the > raft log? If the live brokers list is maintained only on the active > controller (raft leader), then, in case of leader failure, there will be a > window where the new leader does not know about the live brokers, till the > brokers establish the leases again. > I think it will be safer to have leases as a hard state managed by > standard Raft replication. > Or am I misunderstanding something? (I assume that with soft state, you > mean something like zookeeper local sessions > https://issues.apache.org/jira/browse/ZOOKEEPER-1147.) > > >>Our code is single threaded as well. I think it makes sense for the > controller, since otherwise locking becomes very messy. I'm not sure I > >>understand your question about duplicate broker ID detection, though. > There's a section in the KIP about this -- is there a detail we should add > ?>>there? > I assumed broker leases are implemented as a hard state. In that case, to > check for broker id conflict, we need to check the broker ids at two places > 1. Pending broker registrations (which are yet to be committed) 2. Already > committed broker registrations. > > Thanks, > Unmesh > > > > On Mon, Aug 31, 2020 at 5:42 PM Colin McCabe <cmcc...@apache.org> wrote: > >> On Sat, Aug 29, 2020, at 01:12, Unmesh Joshi wrote: >> > >>>Can you repeat your questions about broker leases? >> > >> > >>>>The LeaseStartTimeMs is expected to be the broker's >> > 'System.currentTimeMillis()' at the point of the request. The active >> > controller will add its lease period to this in order >>>>to compute the >> > LeaseEndTimeMs. >> > >> > I think the use of LeaseStartTimeMs and LeaseEndTimeMs in the KIP is a >> > bit >> > confusing. Monotonic Clock (System.nanoTime) on the active controller >> > should be used to track leases. >> > (For example, >> > >> https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114 >> > ) >> > >> > Then we will not need LeaseStartTimeMs? >> > Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active >> controller >> > can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL. >> > In this case we might just drop LeaseEndTimeMs from the response, as the >> > broker already knows about the TTL and can send heartbeats at some >> fraction >> > of TTL, say every TTL/4 milliseconds.(elapsed time on the broker >> measured >> > by System.nanoTime) >> > >> >> Hi Unmesh, >> >> I agree that the monotonic clock is probably a better idea here. It is >> good to be robust against wall clock changes, although I think a cluster >> which had them might suffer other issues. I will change it to specify a >> monotonic clock. >> >> The reason for including LeaseStartTimeMs in the request is to ensure >> that the time required to communicate with the controller gets included in >> the lease time. Since requests can potentially be delayed in the network >> for a long time, this is important. >> >> > >> > I have a prototype built to demonstrate this as following: >> > >> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala >> > >> > The Kip631Controller itself depends on a Consensus module, to >> demonstrate >> > how possible interactions with the consensus module will look like >> > (The Consensus can be pluggable really, with an API to allow reading >> > replicated log upto HighWaterMark) >> > >> > It has an implementation of LeaseTracker >> > >> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/LeaderLeaseTracker.scala >> > to demonstrate LeaseTracker's interaction with the consensus module. >> > >> > The implementation has the following aspects: >> > 1. The lease tracking happens only on the active controller (raft >> > leader) >> > 2. Once the lease expires, it needs to propose and commit a FenceBroker >> > record for that lease. >> > 3. In case of active controller failure, the lease will be tracked by >> > the >> > newly raft leader. The new raft leader starts the lease timer again, (as >> > implemented in onBecomingLeader method of >> > >> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala >> > ) >> > in effect extending the lease by the time spent in the leader election >> > and >> > whatever time was elapsed on the old leader. >> >> Yes, I agree that the lease timeout on the controller side should be >> reset in the case of controller failover. The alternative would be to >> track the lease as hard state rather than soft state, but I think that is >> not really needed, and would result in more log entries. >> >> > >> > There are working tests for this implementation here. >> > >> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala >> > and an end to end test here >> > >> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/ProducerConsumerKIP500Test.scala >> > < >> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala >> > >> > >>'m not sure what you mean by "de-duplication of the broker." Can you >> > give a little more context? >> > Apologies for using the confusing term deduplication. I meant broker id >> > conflict. >> > As you can see in the prototype handleRequest of KIP631Controller >> > < >> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala >> >, >> > the duplicate broker id needs to be detected before the BrokerRecord is >> > submitted to the raft module. >> > Also as implemented in the prototype, the KIP631Controller is single >> > threaded, handling requests one at a time. (an example of >> > >> https://martinfowler.com/articles/patterns-of-distributed-systems/singular-update-queue.html >> > ) >> >> Our code is single threaded as well. I think it makes sense for the >> controller, since otherwise locking becomes very messy. I'm not sure I >> understand your question about duplicate broker ID detection, though. >> There's a section in the KIP about this -- is there a detail we should add >> there? >> >> best, >> Colin >> >> >> > >> > Thanks, >> > Unmesh >> > >> > On Sat, Aug 29, 2020 at 10:49 AM Colin McCabe <co...@cmccabe.xyz> >> wrote: >> > >> > > On Fri, Aug 28, 2020, at 19:36, Unmesh Joshi wrote: >> > > > Hi Colin, >> > > > >> > > > There were a few of questions I had.. >> > > >> > > Hi Unmesh, >> > > >> > > Thanks for the response. >> > > >> > > > >> > > > 1. Were my comments on the broker lease implementation (and >> corresponding >> > > > prototype) appropriate and do we need to change the KIP >> > > > description accordingly?. >> > > > >> > > >> > > Can you repeat your questions about broker leases? >> > > >> > > > >> > > > 2. How will broker epochs be generated? I am assuming it can be the >> > > > committed log offset (like zxid?) >> > > > >> > > >> > > There isn't any need to use a log offset. We can just look at an >> > > in-memory hash table and see what the latest number is, and add one, >> to >> > > generate a new broker epoch. >> > > >> > > > >> > > > 3. How will producer registration happen? I am assuming it should be >> > > > similar to broker registration, with a similar way to generate >> producer >> > > id. >> > > > >> > > >> > > For the EOS stuff, we will need a few new RPCs to the controller. I >> think >> > > we should do that in a follow-on KIP, though, since this one is >> already >> > > pretty big. >> > > >> > > > >> > > > 4. Because we expose Raft log to all the brokers, any >> de-duplication of >> > > the >> > > > broker needs to happen before the requests are proposed to Raft. >> For this >> > > > the controller needs to be single threaded, and should do validation >> > > > against the in-process or pending requests and the final state. I >> read a >> > > > mention of this, in the responses in this thread.Will it be useful >> to >> > > > mention this in the KIP? >> > > > >> > > >> > > I'm not sure what you mean by "de-duplication of the broker." Can you >> > > give a little more context? >> > > >> > > best, >> > > Colin >> > > >> > > > >> > > > Thanks, >> > > > Unmesh >> > > > >> > > > On Sat, Aug 29, 2020 at 4:50 AM Colin McCabe <cmcc...@apache.org> >> wrote: >> > > > >> > > > > Hi all, >> > > > > >> > > > > I'm thinking of calling a vote on KIP-631 on Monday. Let me know >> if >> > > > > there's any more comments I should address before I start the >> vote. >> > > > > >> > > > > cheers, >> > > > > Colin >> > > > > >> > > > > On Tue, Aug 11, 2020, at 05:39, Unmesh Joshi wrote: >> > > > > > >>Hi Unmesh, >> > > > > > >>Thanks, I'll take a look. >> > > > > > Thanks. I will be adding more to the prototype and will be >> happy to >> > > help >> > > > > > and collaborate. >> > > > > > >> > > > > > Thanks, >> > > > > > Unmesh >> > > > > > >> > > > > > On Tue, Aug 11, 2020 at 12:28 AM Colin McCabe < >> cmcc...@apache.org> >> > > > > wrote: >> > > > > > >> > > > > > > Hi Jose, >> > > > > > > >> > > > > > > That'a s good point that I hadn't considered. It's probably >> worth >> > > > > having >> > > > > > > a separate leader change message, as you mentioned. >> > > > > > > >> > > > > > > Hi Unmesh, >> > > > > > > >> > > > > > > Thanks, I'll take a look. >> > > > > > > >> > > > > > > best, >> > > > > > > Colin >> > > > > > > >> > > > > > > >> > > > > > > On Fri, Aug 7, 2020, at 11:56, Jose Garcia Sancio wrote: >> > > > > > > > Hi Unmesh, >> > > > > > > > >> > > > > > > > Very cool prototype! >> > > > > > > > >> > > > > > > > Hi Colin, >> > > > > > > > >> > > > > > > > The KIP proposes a record called IsrChange which includes >> the >> > > > > > > > partition, topic, isr, leader and leader epoch. During >> normal >> > > > > > > > operation ISR changes do not result in leader changes. >> Similarly, >> > > > > > > > leader changes do not necessarily involve ISR changes. The >> > > controller >> > > > > > > > implementation that uses ZK modeled them together because >> > > > > > > > 1. All of this information is stored in one znode. >> > > > > > > > 2. ZK's optimistic lock requires that you specify the new >> value >> > > > > > > completely >> > > > > > > > 3. The change to that znode was being performed by both the >> > > > > controller >> > > > > > > > and the leader. >> > > > > > > > >> > > > > > > > None of these reasons are true in KIP-500. Have we >> considered >> > > having >> > > > > > > > two different records? For example >> > > > > > > > >> > > > > > > > 1. IsrChange record which includes topic, partition, isr >> > > > > > > > 2. LeaderChange record which includes topic, partition, >> leader >> > > and >> > > > > > > leader epoch. >> > > > > > > > >> > > > > > > > I suspect that making this change will also require >> changing the >> > > > > > > > message AlterIsrRequest introduced in KIP-497: Add >> inter-broker >> > > API >> > > > > to >> > > > > > > > alter ISR. >> > > > > > > > >> > > > > > > > Thanks >> > > > > > > > -Jose >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >