>>>Can you repeat your questions about broker leases?

>>>>The LeaseStartTimeMs is expected to be the broker's
'System.currentTimeMillis()' at the point of the request. The active
controller will add its lease period to this in order >>>>to compute the
LeaseEndTimeMs.

I think the use of LeaseStartTimeMs and LeaseEndTimeMs in the KIP is a bit
confusing.  Monotonic Clock (System.nanoTime) on the active controller
should be used to track leases.
(For example,
https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114
)

Then we will not need LeaseStartTimeMs?
Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active controller
can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL.
In this case we might just drop LeaseEndTimeMs from the response, as the
broker already knows about the TTL and can send heartbeats at some fraction
of TTL, say every TTL/4 milliseconds.(elapsed time on the broker measured
by System.nanoTime)

I have a prototype built to demonstrate this as following:
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala

The Kip631Controller itself depends on a Consensus module, to demonstrate
how possible interactions with the consensus module will look like
 (The Consensus can be pluggable really, with an API to allow reading
replicated log upto HighWaterMark)

It has an implementation of LeaseTracker
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/LeaderLeaseTracker.scala
to demonstrate LeaseTracker's interaction with the consensus module.

The implementation has the following aspects:
1. The lease tracking happens only on the active controller (raft leader)
2. Once the lease expires, it needs to propose and commit a FenceBroker
record for that lease.
3. In case of active controller failure, the lease will be tracked by the
newly raft leader. The new raft leader starts the lease timer again, (as
implemented in onBecomingLeader method of
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
)
in effect extending the lease by the time spent in the leader election and
whatever time was elapsed on the old leader.

There are working tests for this implementation here.
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala
and an end to end test here
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/ProducerConsumerKIP500Test.scala
<https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala>
>>'m not sure what you mean by "de-duplication of the broker."  Can you
give a little more context?
Apologies for using the confusing term deduplication. I meant broker id
conflict.
As you can see in the prototype handleRequest of KIP631Controller
<https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala>,
the duplicate broker id needs to be detected before the BrokerRecord is
submitted to the raft module.
Also as implemented in the prototype, the KIP631Controller is single
threaded, handling requests one at a time. (an example of
https://martinfowler.com/articles/patterns-of-distributed-systems/singular-update-queue.html
)

Thanks,
Unmesh

On Sat, Aug 29, 2020 at 10:49 AM Colin McCabe <co...@cmccabe.xyz> wrote:

> On Fri, Aug 28, 2020, at 19:36, Unmesh Joshi wrote:
> > Hi Colin,
> >
> > There were a few of questions I had..
>
> Hi Unmesh,
>
> Thanks for the response.
>
> >
> > 1. Were my comments on the broker lease implementation (and corresponding
> > prototype) appropriate and do we need to change the KIP
> > description accordingly?.
> >
>
> Can you repeat your questions about broker leases?
>
> >
> > 2. How will broker epochs be generated? I am assuming it can be the
> > committed log offset (like zxid?)
> >
>
> There isn't any need to use a log offset.  We can just look at an
> in-memory hash table and see what the latest number is, and add one, to
> generate a new broker epoch.
>
> >
> > 3. How will producer registration happen? I am assuming it should be
> > similar to broker registration, with a similar way to generate producer
> id.
> >
>
> For the EOS stuff, we will need a few new RPCs to the controller.  I think
> we should do that in a follow-on KIP, though, since this one is already
> pretty big.
>
> >
> > 4. Because we expose Raft log to all the brokers, any de-duplication of
> the
> > broker needs to happen before the requests are proposed to Raft. For this
> > the controller needs to be single threaded, and should do validation
> > against the in-process or pending requests and the final state. I read a
> > mention of this, in the responses in this thread.Will it be useful to
> > mention this in the KIP?
> >
>
> I'm not sure what you mean by "de-duplication of the broker."  Can you
> give a little more context?
>
> best,
> Colin
>
> >
> > Thanks,
> > Unmesh
> >
> > On Sat, Aug 29, 2020 at 4:50 AM Colin McCabe <cmcc...@apache.org> wrote:
> >
> > > Hi all,
> > >
> > > I'm thinking of calling a vote on KIP-631 on Monday.  Let me know if
> > > there's any more comments I should address before I start the vote.
> > >
> > > cheers,
> > > Colin
> > >
> > > On Tue, Aug 11, 2020, at 05:39, Unmesh Joshi wrote:
> > > > >>Hi Unmesh,
> > > > >>Thanks, I'll take a look.
> > > > Thanks. I will be adding more to the prototype and will be happy to
> help
> > > > and collaborate.
> > > >
> > > > Thanks,
> > > > Unmesh
> > > >
> > > > On Tue, Aug 11, 2020 at 12:28 AM Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > > >
> > > > > Hi Jose,
> > > > >
> > > > > That'a s good point that I hadn't considered.  It's probably worth
> > > having
> > > > > a separate leader change message, as you mentioned.
> > > > >
> > > > > Hi Unmesh,
> > > > >
> > > > > Thanks, I'll take a look.
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >
> > > > > On Fri, Aug 7, 2020, at 11:56, Jose Garcia Sancio wrote:
> > > > > > Hi Unmesh,
> > > > > >
> > > > > > Very cool prototype!
> > > > > >
> > > > > > Hi Colin,
> > > > > >
> > > > > > The KIP proposes a record called IsrChange which includes the
> > > > > > partition, topic, isr, leader and leader epoch. During normal
> > > > > > operation ISR changes do not result in leader changes. Similarly,
> > > > > > leader changes do not necessarily involve ISR changes. The
> controller
> > > > > > implementation that uses ZK modeled them together because
> > > > > > 1. All of this information is stored in one znode.
> > > > > > 2. ZK's optimistic lock requires that you specify the new value
> > > > > completely
> > > > > > 3. The change to that znode was being performed by both the
> > > controller
> > > > > > and the leader.
> > > > > >
> > > > > > None of these reasons are true in KIP-500. Have we considered
> having
> > > > > > two different records? For example
> > > > > >
> > > > > > 1. IsrChange record which includes topic, partition, isr
> > > > > > 2. LeaderChange record which includes topic, partition, leader
> and
> > > > > leader epoch.
> > > > > >
> > > > > > I suspect that making this change will also require changing the
> > > > > > message AlterIsrRequest introduced in KIP-497: Add inter-broker
> API
> > > to
> > > > > > alter ISR.
> > > > > >
> > > > > > Thanks
> > > > > > -Jose
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to