On Fri, Sep 25, 2020, at 10:17, Jun Rao wrote: > Hi, Colin, > > Thanks for the reply. > > 60. Yes, I think you are right. We probably need the controller id when a > broker starts up. A broker only stores the Raft leader id in the metadata > file. To do the initial fetch to the Raft leader, it needs to know the > host/port associated with the leader id. > > 62. It seems there are 2 parts to this : (1) which listener a client should > use to initiate a connection to the controller and (2) which listener > should a controller use to accept client requests. For (1), at any point of > time, a client only needs to use one listener. I think > controller.listener.name is meant for the client.
Hi Jun, controller.listener.names is also used by the controllers. In the case where we have a broker and a controller in the same JVM, we have a single config file. Then we need to know which listeners belong to the controller and which belong to the broker component. That's why it's a list. > So, a single value seems > to make more sense. Currently, we don't have a configuration for (2). We > could add a new one for that and support a list. I am wondering how useful > it will be. One example that I can think of is that we can reject > non-controller related requests if accepted on controller-only listeners. > However, we already support separate authentication for the controller > listener. So, not sure how useful it is. The controller always has a separate listener and does not share listeners with the broker. The main reason for this is to avoid giving clients the ability to launch a denial-of-service attack on the controller by flooding a broker port. A lot of times, these attacks are made unintentionally by poorly coded or configured clients. Additionally, broker ports can also be very busy with large fetch requests, and so on. It's just a bad configuration in general to try to overload the same port for the controller and broker, and we don't want to allow it. It would be a regression to go from the current system where control requests are safely isolated on a separate port, to one where they're not. It also makes the code and configuration a lot messier. > > 63. (a) I think most users won't know controller.id defaults to broker.id + > 3000. So, it can be confusing for them to set up controller.connect. If > this is truly needed, it seems that it's less confusing to make > controller.id required. > (b) I am still trying to understand if we truly need to expose a > controller.id. What if we only expose broker.id and let controller.connect > just use broker.id? What will be missing? The controller has a separate ID from the broker, so knowing broker.id is not helpful here. best, Colin > > Thanks, > > Jun > > On Thu, Sep 24, 2020 at 10:55 PM Colin McCabe <cmcc...@apache.org> wrote: > > > On Thu, Sep 24, 2020, at 16:24, Jun Rao wrote: > > > Hi, Colin, > > > > > > Thanks for the reply and the updated KIP. A few more comments below. > > > > > > > Hi Jun, > > > > > > > > 53. It seems that you already incorporated the changes in KIP-516. With > > > topic ids, we don't need to wait for the topic's data to be deleted > > before > > > removing the topic metadata. If the topic is recreated, we can still > > delete > > > the data properly based on the topic id. So, it seems that we can remove > > > TopicRecord.Deleting. > > > > > > > Thanks for the reply. What I was thinking of doing here was using topic > > IDs internally, but still using names externally. So the topic UUIDs are > > only for the purpose of associating topics with partitions -- from the > > user's point of view, topics are still identified by names. > > > > You're right that KIP-516 will simplify things, but I'm not sure when that > > will land, so I wanted to avoid blocking the initial implementation of this > > KIP on it. > > > > > > > > 55. It seems to me that the current behavior where we favor the current > > > broker registration is better. This is because uncontrolled broker > > shutdown > > > is rare and its impact is less severe since one just needs to wait for > > the > > > session timeout before restarting the broker. If a mis-configured broker > > > replaces an existing broker, the consequence is more severe since it can > > > cause the leader to be unavailable, a replica to be out of ISR, and add > > > more load on the leaders etc. > > > > > > > Hmm, that's a good point. Let me check this a bit more before I change > > it, though. > > > > > > > > 60. controller.connect=0...@controller0.example.com:9093, > > > 1...@controller1.example.com:9093,2...@controller2.example.com : Do we > > > need to > > > include the controller id before @? It seems that the host/port is enough > > > for establishing the connection. It would also be useful to make it clear > > > that controller.connect replaces quorum.voters in KIP-595. > > > > > > > I discussed this with Jason earlier, and he felt that the controller IDs > > were needed in this configuration key. It is certainly needed when > > configuring the controllers themselves, since they need to know each > > others' IDs. > > > > > > > > 61. I am not sure that we need both controller.listeners and > > > controller.connect.security.protocol since the former implies the > > security > > > protocol. The reason that we have both inter.broker.listener.name and > > > inter.broker.security.protocol is because we had the latter first and > > later > > > realized that the former is more general. > > > > > > > That's a good point. I've removed this from the KIP. > > > > > > > > 62. I am still not sure that you need controller.listeners to be a list. > > > All listeners are already defined in listeners. To migrate from plaintext > > > to SSL, one can configure listeners to have both plaintext and SSL. After > > > that, one can just change controller.listeners from plaintext to SSL. > > This > > > is similar to how to change the listener for inter broker connections. > > > Also, controller.listener.name may be a more accurate name? > > > > > > > The issue that I see here is that if you are running with the controller > > and broker in the same JVM, if you define a few listeners in "listeners" > > they will get used as regular broker listeners, unless you put them in > > controller.listeners. Therefore, controller.listeners needs to be a list. > > > > controller.listener.names does sound like a better name, though... I've > > updated it to that. > > > > > > > > 63. Regarding controller.id, I am trying to understand whether it's a > > > required configuration or an optional one. From the KIP, it sounds like > > > controller.id is optional. Then, if this is unset, it's not clear how > > the > > > user will obtain the controller id for setting controller.connect. > > > > > > > If you specify broker.id but not controller.id, then controller.id will > > just be set to broker.id + 3000. This was intended to make some > > configurations a little bit more concise to write. You still do have to > > know the controller IDs when configuring the brokers, though. If this > > seems confusing then I can just make it mandatory. > > > > > > > > 64. KIP-516 adds a flag in LeaderAndIsrRequest to indicate whether it > > > includes all partitions or not. This will be used to clean up strayed > > logs. > > > I was thinking how we will do the same thing once the controller moves to > > > Raft based. One way to do that is on broker startup, it gets the HWM in > > the > > > metadata log from the Raft leader and waits until its metadata catches up > > > to HWM. At that point, any local log not seen in the metadata can be > > > removed. Since the Fetch response returns the HWM, there seems to be > > enough > > > APIs to achieve this. > > > > > > > That's a very good point. I added a note about this under Broker Startup. > > > > best, > > Colin > > > > > > > > Jun > > > > > > > > > > > > On Thu, Sep 24, 2020 at 1:07 PM Colin McCabe <co...@cmccabe.xyz> wrote: > > > > > > > On Mon, Sep 21, 2020, at 18:13, Jun Rao wrote: > > > > > Hi, Colin, > > > > > > > > > > Sorry for the late reply. A few more comments below. > > > > > > > > > > > > > Hi Jun, > > > > > > > > Thanks for taking another look. > > > > > > > > > > > > > > 50. Configurations > > > > > 50.1 controller.listeners: It seems that a controller just needs one > > > > > listener. Why do we need to have a list here? Also, could you > > provide an > > > > > example of how this is set and what's its relationship with existing > > > > > configs such as "security.inter.broker.protocol" and " > > > > > inter.broker.listener.name"? > > > > > > > > > > > > > I agree that most administrators will want to run with only one > > controller > > > > listener. However, just as with brokers, it is nice to have the > > option to > > > > expose multiple ports. > > > > > > > > One reason why you might want multiple ports is if you were doing a > > > > migration from plaintext to SSL. You could add new SSL listeners but > > > > continue to expose the PLAINTEXT listeners. Then you could add the > > new SSL > > > > controller config to each broker and roll each broker. Then you could > > > > migrate from PLAINTEXT to SSL with no downtime. > > > > > > > > Here's an example configuration for the controller: > > > > controller.connect=0...@controller0.example.com:9093, > > > > 1...@controller1.example.com:9093,2...@controller2.example.com > > > > controller.listeners=CONTROLLER > > > > listeners=CONTROLLER://controller0.example.com:9093 > > > > listener.security.protocol.map=CONTROLLER:SSL > > > > > > > > Here's an example configuration for the broker: > > > > controller.connect=0...@controller0.example.com:9093, > > > > 1...@controller1.example.com:9093,2...@controller2.example.com > > > > controller.connect.security.protocol=SSL > > > > > > > > security.inter.broker.protocol or inter.broker.listener.name do not > > > > affect how the broker communicates with the controller. Those > > > > configurations relate to how brokers communicate with each other, but > > the > > > > controller is not a broker. > > > > > > > > (Note that I just added controller.connect.security.protocol to the > > KIP -- > > > > I had forgotten to put it in earlier) > > > > > > > > > > > > > > 50.2 registration.heartbeat.interval.ms and > > > > registration.lease.timeout.ms. > > > > > Should we match their default value with the corresponding default > > for > > > > ZK? > > > > > > > > > > > > > Fair enough. I'll set them to the values of the > > zookeeper.sync.time.ms > > > > and zookeeper.connection.timeout.ms configurations. I do think we > > should > > > > experiment later on to see what works well here, but the ZK values are > > at > > > > least a starting point. > > > > > > > > > > > > > > 50.3 controller.connect: Could you provide an example? I am > > wondering how > > > > > it differs from quorum.voters=1@kafka-1:9092, 2@kafka-2:9092, > > 3@kafka-3 > > > > > :9092. > > > > > > > > > > > > > controller.connect is intended to be the new name for quorum.voters. > > > > During the vote for KIP-595, we sort of agreed to defer the discussion > > > > about what this configuration should be called. I proposed this new > > name > > > > because it makes it clear what the configuration is for (how to > > connect to > > > > the controllers). > > > > > > > > > > > > > > 50.4 controller.id: I am still not sure how this is being used. > > Could > > > > you > > > > > explain this in more detail? > > > > > > > > > > > > > The controller ID needs to be set on each controller. It also appears > > in > > > > controller.connect as the thing before the "at" sign. Its function is > > > > pretty similar to broker.id. > > > > > > > > Broker IDs and controller IDs exist in the same ID space, so you can't > > > > have a broker and a controller that use the same ID. > > > > > > > > > > > > > > 51. BrokerHeartbeat: It seems a bit wasteful to include Listeners in > > > > every > > > > > heartbeat request since it typically doesn't change. Could we make > > that > > > > an > > > > > optional field? > > > > > > > > > > > > > Ok. > > > > > > > > > > > > > > 52. KIP-584 adds a new ZK node /features. Should we add a > > corresponding > > > > > metadata record? > > > > > > > > > > > > > Good point. I added FeatureLevelRecord. > > > > > > > > > > > > > > 53. TopicRecord and DeleteTopic: Both DeleteTopic and > > > > TopicRecord.Deleting > > > > > indicate topic deletion. Could we outline the flow when each will be > > set? > > > > > In particular, which one indicates the intention to delete and which > > one > > > > > indicates the completion of the deletion. > > > > > > > > > > > > > TopicRecord.Deleting is set when we intend to delete the topic. > > > > > > > > DeleteTopic removes the topic completely. I will rename DeleteTopic to > > > > RemoveTopic to make this clearer. > > > > > > > > > > > > > > 54. "The controller can generate a new broker epoch by using the > > latest > > > > log > > > > > offset." Which offset is that? Is it the offset of the metadata > > topic for > > > > > the corresponding BrokerRecord? Is it guaranteed to be different on > > each > > > > > broker restart? > > > > > > > > > > > > > Yes.... a new broker epoch implies that there has been a new record > > > > created in the metadata log. Therefore the last committed offset must > > be > > > > different. > > > > > > > > > > > > > > 55. "Thereafter, it may lose subsequent conflicts if its broker > > epoch is > > > > > stale. (See KIP-380 for some background on broker epoch.) The > > reason > > > > for > > > > > favoring new processes is to accommodate the common case where a > > process > > > > is > > > > > killed with kill -9 and then restarted. " Are you saying that if > > there is > > > > > an active broker registered in the controller, a new broker heartbeat > > > > > request with the INITIAL state will fence the current broker > > session? Not > > > > > sure about this. For example, this will allow a broker with > > incorrectly > > > > > configured broker id to kill an existing valid broker. > > > > > > > > > > > > > Yes, a new broker with an incorrectly configured broker id could fence > > an > > > > existing valid broker. > > > > > > > > The goal of the conflict resolution process described here is to avoid > > > > having two brokers go back and forth in claiming the broker id. > > Choosing > > > > the newest was just an easy way to do that. > > > > > > > > Choosing the oldest is another possibility, and more inline with what > > > > ZooKeeper does today. The problem with that is that it would not be > > > > possible to re-create the broker if the old instance died suddenly, > > except > > > > by waiting for the full lease timeout. > > > > > > > > It might be possible to do a little better here by breaking the lease > > if > > > > the TCP connection from the broker drops. But that requires crossing > > some > > > > layers of abstraction in the Kafka networking stack. There's also the > > > > possibility of false negatives or positives. For example, TCP > > connections > > > > sometimes just drop even if the node is still there. Or sometimes the > > > > networking stack keeps a TCP connection alive for a while when the > > other > > > > end is gone. > > > > > > > > > > > > > > 56. kafka-storage.sh: > > > > > 56.1 In the info mode, what other information does it show in > > addition to > > > > > "kip.500.mode=enabled"? > > > > > > > > > > > > > I think it should show a list of all the storage directories, and > > whether > > > > each one is formatted. In addition, it should show whether > > kip.500.mode is > > > > enabled, and what the cluster id is. > > > > > > > > > > > > > > 56.2 Should the format mode take the config file as the input too > > like > > > > the > > > > > info mode? > > > > > > > > > > > > > Yes, that's a good idea. > > > > > > > > best, > > > > Colin > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Thu, Sep 17, 2020 at 4:12 PM Colin McCabe <cmcc...@apache.org> > > wrote: > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > That's a fair point. I have moved the lease duration to the broker > > > > > > heartbeat response. That way lease durations can be changed just > > be > > > > > > reconfiguring the controllers. > > > > > > > > > > > > best, > > > > > > Colin > > > > > > > > > > > > On Wed, Sep 16, 2020, at 07:40, Unmesh Joshi wrote: > > > > > > > Thanks Colin, the changes look good to me. One small thing. > > > > > > > registration.lease.timeout.ms is the configuration on the > > controller > > > > > > side. > > > > > > > It will be good to comment on how brokers know about it, to be > > able > > > > to > > > > > > > send LeaseDurationMs > > > > > > > in the heartbeat request, > > > > > > > or else it can be added in the heartbeat response for brokers to > > know > > > > > > about > > > > > > > it. > > > > > > > > > > > > > > Thanks, > > > > > > > Unmesh > > > > > > > > > > > > > > On Fri, Sep 11, 2020 at 10:32 PM Colin McCabe < > > cmcc...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > I think you're right that we should use a duration here rather > > > > than a > > > > > > > > time. As you said, the clock on the controller will probably > > not > > > > > > match the > > > > > > > > one on the broker. I have updated the KIP. > > > > > > > > > > > > > > > > > > It's important to keep in mind that messages may be > > delayed in > > > > the > > > > > > > > > > network, or arrive out of order. When this happens, we > > will > > > > use > > > > > > the > > > > > > > > start > > > > > > > > > > time specified in the request to determine if the request > > is > > > > stale. > > > > > > > > > I am assuming that there will be a single TCP connection > > > > maintained > > > > > > > > between > > > > > > > > > broker and active controller. So, there won't be any out of > > order > > > > > > > > requests? > > > > > > > > > There will be a scenario of broker GC pause, which might > > cause > > > > > > connection > > > > > > > > > timeout and broker might need to reestablish the connection. > > If > > > > the > > > > > > pause > > > > > > > > > is too long, lease will expire and the heartbeat sent after > > the > > > > pause > > > > > > > > will > > > > > > > > > be treated as a new registration (similar to restart case), > > and > > > > a new > > > > > > > > epoch > > > > > > > > > number will be assigned to the broker. > > > > > > > > > > > > > > > > I agree with the end of this paragraph, but not with the start > > :) > > > > > > > > > > > > > > > > There can be out-of-order requests, since the broker will > > simply > > > > use a > > > > > > new > > > > > > > > TCP connection if the old one has problems. This can happen > > for a > > > > > > variety > > > > > > > > of reasons. I don't think GC pauses are the most common > > reason for > > > > > > this to > > > > > > > > happen. It's more common to see issues issues in the network > > > > itself > > > > > > that > > > > > > > > result connections getting dropped from time to time. > > > > > > > > > > > > > > > > So we have to assume that messages may arrive out of order, and > > > > > > possibly > > > > > > > > be delayed. I added a note that heartbeat requests should be > > > > ignored > > > > > > if > > > > > > > > the metadata log offset they contain is smaller than a previous > > > > > > heartbeat. > > > > > > > > > > > > > > > > > When the active controller fails, the new active controller > > > > needs to > > > > > > be > > > > > > > > > sure that it considers all the known brokers as alive till > > the > > > > lease > > > > > > > > > expiration interval. Because registration.lease.timeout.ms, > > is > > > > > > > > configured > > > > > > > > > on the controller, the new active controller will extend all > > the > > > > > > leases > > > > > > > > by > > > > > > > > > registration.lease.timeout.ms. I see that it won't need last > > > > > > heartbeat > > > > > > > > > time. > > > > > > > > > > > > > > > > Agreed. > > > > > > > > > > > > > > > > best, > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Unmesh > > > > > > > > > > > > > > > > > > On Sat, Sep 5, 2020 at 1:28 AM Colin McCabe < > > cmcc...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > Colin wrote: > > > > > > > > > > > > The reason for including LeaseStartTimeMs in the > > request > > > > is to > > > > > > > > ensure > > > > > > > > > > > > that the time required to communicate with the > > controller > > > > gets > > > > > > > > > > included in > > > > > > > > > > > > the lease time. Since requests can potentially be > > delayed > > > > in > > > > > > the > > > > > > > > > > network > > > > > > > > > > > > for a long time, this is important. > > > > > > > > > > > > > > > > > > > > On Mon, Aug 31, 2020, at 05:58, Unmesh Joshi wrote: > > > > > > > > > > > The network time will be added anyway, because the lease > > > > timer > > > > > > on the > > > > > > > > > > > active controller will start only after the heartbeat > > request > > > > > > > > reaches the > > > > > > > > > > > server. > > > > > > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > > > > > If the start time is not specified in the request, then the > > > > network > > > > > > > > time > > > > > > > > > > is excluded from the heartbeat time. > > > > > > > > > > > > > > > > > > > > Here's an example: > > > > > > > > > > Let's say broker A sends a heartbeat at time 100, and it > > > > arrives > > > > > > on the > > > > > > > > > > controller at time 200, and the lease duration is 1000. > > > > > > > > > > > > > > > > > > > > The controller looks at the start time in the request, > > which is > > > > > > 100, > > > > > > > > and > > > > > > > > > > adds 1000 to it, getting 1100. On the other hand, if start > > > > time > > > > > > is not > > > > > > > > > > specified in the request, then the expiration will be at > > time > > > > 1200. > > > > > > > > > > That is what I mean by "the network time is included in the > > > > > > expiration > > > > > > > > > > time." > > > > > > > > > > > > > > > > > > > > > And I think, some assumption about network round trip > > time is > > > > > > > > > > > needed anyway to decide on the frequency of the > > heartbeat ( > > > > > > > > > > > registration.heartbeat.interval.ms), and lease timeout ( > > > > > > > > > > > registration.lease.timeout.ms). So I think just having a > > > > > > leaseTTL > > > > > > > > in the > > > > > > > > > > > request is easier to understand and implement. > > > > > > > > > > > > > > > > > > > > It's important to keep in mind that messages may be > > delayed in > > > > the > > > > > > > > > > network, or arrive out of order. When this happens, we > > will > > > > use > > > > > > the > > > > > > > > start > > > > > > > > > > time specified in the request to determine if the request > > is > > > > stale. > > > > > > > > > > > > > > > > > > > > > > Yes, I agree that the lease timeout on the controller > > side > > > > > > should > > > > > > > > be > > > > > > > > > > > > reset in the case of controller failover. The > > alternative > > > > > > would > > > > > > > > be to > > > > > > > > > > > > track the lease as hard state rather than soft state, > > but I > > > > > > think > > > > > > > > that > > > > > > > > > > > > is not really needed, and would result in more log > > entries. > > > > > > > > > > > My interpretation of the mention of BrokerRecord in the > > KIP > > > > was > > > > > > that > > > > > > > > this > > > > > > > > > > > record exists in the Raft log. > > > > > > > > > > > > > > > > > > > > BrokerRecord does exist in the Raft log, but does not > > include > > > > the > > > > > > last > > > > > > > > > > heartbeat time. > > > > > > > > > > > > > > > > > > > > > By soft state, do you mean the broker > > > > > > > > > > > records exist only on the active leader and will not be > > > > > > replicated > > > > > > > > in the > > > > > > > > > > > raft log? If the live brokers list is maintained only on > > the > > > > > > active > > > > > > > > > > > controller (raft leader), then, in case of leader > > failure, > > > > there > > > > > > > > will be > > > > > > > > > > a > > > > > > > > > > > window where the new leader does not know about the live > > > > brokers, > > > > > > > > till > > > > > > > > > > the > > > > > > > > > > > brokers establish the leases again. > > > > > > > > > > > I think it will be safer to have leases as a hard state > > > > managed > > > > > > by > > > > > > > > > > standard > > > > > > > > > > > Raft replication. > > > > > > > > > > > > > > > > > > > > Leases are short, so the need to re-establish them after a > > > > > > controller > > > > > > > > > > failover doesn't seem like a big problem. But this is > > > > something > > > > > > we can > > > > > > > > > > tweak if it becomes an issue. One option would be to have > > a > > > > > > separate > > > > > > > > log > > > > > > > > > > which is only used by the controller nodes for this (since, > > > > after > > > > > > all, > > > > > > > > > > brokers don't care about registration renewals). > > > > > > > > > > > > > > > > > > > > > Or am I misunderstanding something? (I assume that with > > soft > > > > > > state, > > > > > > > > you > > > > > > > > > > > mean something like zookeeper local sessions > > > > > > > > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-1147.) > > > > > > > > > > > > > > > > > > > > > > > Our code is single threaded as well. I think it makes > > > > sense > > > > > > for > > > > > > > > the > > > > > > > > > > > > controller, since otherwise locking becomes very messy. > > > > I'm > > > > > > not > > > > > > > > sure I > > > > > > > > > > > > understand your question about duplicate broker ID > > > > detection, > > > > > > > > though. > > > > > > > > > > > There's a section in the KIP about this -- is there a > > detail > > > > we > > > > > > > > should > > > > > > > > > > add > > > > > > > > > > > there? > > > > > > > > > > > > > > > > > > > > This is an implementation detail that doesn't need to be > > in the > > > > > > KIP. > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > I assumed broker leases are implemented as a hard state. > > In > > > > that > > > > > > > > case, to > > > > > > > > > > > check for broker id conflict, we need to check the broker > > > > ids at > > > > > > two > > > > > > > > > > places > > > > > > > > > > > 1. Pending broker registrations (which are yet to be > > > > committed) > > > > > > 2. > > > > > > > > > > Already > > > > > > > > > > > committed broker registrations. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Unmesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Aug 31, 2020 at 5:42 PM Colin McCabe < > > > > cmcc...@apache.org > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > On Sat, Aug 29, 2020, at 01:12, Unmesh Joshi wrote: > > > > > > > > > > > > > >>>Can you repeat your questions about broker leases? > > > > > > > > > > > > > > > > > > > > > > > > > > >>>>The LeaseStartTimeMs is expected to be the > > broker's > > > > > > > > > > > > > 'System.currentTimeMillis()' at the point of the > > > > request. The > > > > > > > > active > > > > > > > > > > > > > controller will add its lease period to this in order > > > > >>>>to > > > > > > > > compute > > > > > > > > > > the > > > > > > > > > > > > > LeaseEndTimeMs. > > > > > > > > > > > > > > > > > > > > > > > > > > I think the use of LeaseStartTimeMs and > > LeaseEndTimeMs > > > > in the > > > > > > > > KIP is > > > > > > > > > > a > > > > > > > > > > > > > bit > > > > > > > > > > > > > confusing. Monotonic Clock (System.nanoTime) on the > > > > active > > > > > > > > > > controller > > > > > > > > > > > > > should be used to track leases. > > > > > > > > > > > > > (For example, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114 > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > > > Then we will not need LeaseStartTimeMs? > > > > > > > > > > > > > Instead of LeaseStartTimeMs, can we call it > > LeaseTTL? The > > > > > > active > > > > > > > > > > > > controller > > > > > > > > > > > > > can then calculate LeaseEndTime = System.nanoTime() + > > > > > > LeaseTTL. > > > > > > > > > > > > > In this case we might just drop LeaseEndTimeMs from > > the > > > > > > > > response, as > > > > > > > > > > the > > > > > > > > > > > > > broker already knows about the TTL and can send > > > > heartbeats at > > > > > > > > some > > > > > > > > > > > > fraction > > > > > > > > > > > > > of TTL, say every TTL/4 milliseconds.(elapsed time > > on the > > > > > > broker > > > > > > > > > > measured > > > > > > > > > > > > > by System.nanoTime) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > > > > > > > > > I agree that the monotonic clock is probably a better > > idea > > > > > > here. > > > > > > > > It is > > > > > > > > > > > > good to be robust against wall clock changes, although > > I > > > > think > > > > > > a > > > > > > > > > > cluster > > > > > > > > > > > > which had them might suffer other issues. I will > > change > > > > it to > > > > > > > > specify > > > > > > > > > > a > > > > > > > > > > > > monotonic clock. > > > > > > > > > > > > > > > > > > > > > > > > The reason for including LeaseStartTimeMs in the > > request > > > > is to > > > > > > > > ensure > > > > > > > > > > that > > > > > > > > > > > > the time required to communicate with the controller > > gets > > > > > > included > > > > > > > > in > > > > > > > > > > the > > > > > > > > > > > > lease time. Since requests can potentially be delayed > > in > > > > the > > > > > > > > network > > > > > > > > > > for a > > > > > > > > > > > > long time, this is important. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have a prototype built to demonstrate this as > > > > following: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala > > > > > > > > > > > > > > > > > > > > > > > > > > The Kip631Controller itself depends on a Consensus > > > > module, to > > > > > > > > > > demonstrate > > > > > > > > > > > > > how possible interactions with the consensus module > > will > > > > look > > > > > > > > like > > > > > > > > > > > > > (The Consensus can be pluggable really, with an API > > to > > > > allow > > > > > > > > reading > > > > > > > > > > > > > replicated log upto HighWaterMark) > > > > > > > > > > > > > > > > > > > > > > > > > > It has an implementation of LeaseTracker > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/LeaderLeaseTracker.scala > > > > > > > > > > > > > to demonstrate LeaseTracker's interaction with the > > > > consensus > > > > > > > > module. > > > > > > > > > > > > > > > > > > > > > > > > > > The implementation has the following aspects: > > > > > > > > > > > > > 1. The lease tracking happens only on the active > > > > controller > > > > > > (raft > > > > > > > > > > > > > leader) > > > > > > > > > > > > > 2. Once the lease expires, it needs to propose and > > > > commit a > > > > > > > > > > FenceBroker > > > > > > > > > > > > > record for that lease. > > > > > > > > > > > > > 3. In case of active controller failure, the lease > > will > > > > be > > > > > > > > tracked by > > > > > > > > > > > > > the > > > > > > > > > > > > > newly raft leader. The new raft leader starts the > > lease > > > > timer > > > > > > > > again, > > > > > > > > > > (as > > > > > > > > > > > > > implemented in onBecomingLeader method of > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala > > > > > > > > > > > > > ) > > > > > > > > > > > > > in effect extending the lease by the time spent in > > the > > > > leader > > > > > > > > > > election > > > > > > > > > > > > > and > > > > > > > > > > > > > whatever time was elapsed on the old leader. > > > > > > > > > > > > > > > > > > > > > > > > Yes, I agree that the lease timeout on the controller > > side > > > > > > should > > > > > > > > be > > > > > > > > > > reset > > > > > > > > > > > > in the case of controller failover. The alternative > > would > > > > be > > > > > > to > > > > > > > > track > > > > > > > > > > the > > > > > > > > > > > > lease as hard state rather than soft state, but I think > > > > that > > > > > > is not > > > > > > > > > > really > > > > > > > > > > > > needed, and would result in more log entries. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > There are working tests for this implementation here. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala > > > > > > > > > > > > > and an end to end test here > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/ProducerConsumerKIP500Test.scala > > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala > > > > > > > > > > > > > > > > > > > > > > > > > > >>'m not sure what you mean by "de-duplication of the > > > > > > broker." > > > > > > > > Can > > > > > > > > > > you > > > > > > > > > > > > > give a little more context? > > > > > > > > > > > > > Apologies for using the confusing term > > deduplication. I > > > > meant > > > > > > > > broker > > > > > > > > > > id > > > > > > > > > > > > > conflict. > > > > > > > > > > > > > As you can see in the prototype handleRequest of > > > > > > KIP631Controller > > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala > > > > > > > > > > > > >, > > > > > > > > > > > > > the duplicate broker id needs to be detected before > > the > > > > > > > > BrokerRecord > > > > > > > > > > is > > > > > > > > > > > > > submitted to the raft module. > > > > > > > > > > > > > Also as implemented in the prototype, the > > > > KIP631Controller is > > > > > > > > single > > > > > > > > > > > > > threaded, handling requests one at a time. (an > > example of > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://martinfowler.com/articles/patterns-of-distributed-systems/singular-update-queue.html > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > Our code is single threaded as well. I think it makes > > > > sense > > > > > > for > > > > > > > > the > > > > > > > > > > > > controller, since otherwise locking becomes very messy. > > > > I'm > > > > > > not > > > > > > > > sure I > > > > > > > > > > > > understand your question about duplicate broker ID > > > > detection, > > > > > > > > though. > > > > > > > > > > > > There's a section in the KIP about this -- is there a > > > > detail we > > > > > > > > should > > > > > > > > > > add > > > > > > > > > > > > there? > > > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Unmesh > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Aug 29, 2020 at 10:49 AM Colin McCabe < > > > > > > co...@cmccabe.xyz > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 28, 2020, at 19:36, Unmesh Joshi wrote: > > > > > > > > > > > > > > > Hi Colin, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > There were a few of questions I had.. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the response. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Were my comments on the broker lease > > > > implementation > > > > > > (and > > > > > > > > > > > > corresponding > > > > > > > > > > > > > > > prototype) appropriate and do we need to change > > the > > > > KIP > > > > > > > > > > > > > > > description accordingly?. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you repeat your questions about broker leases? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. How will broker epochs be generated? I am > > > > assuming it > > > > > > can > > > > > > > > be > > > > > > > > > > the > > > > > > > > > > > > > > > committed log offset (like zxid?) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > There isn't any need to use a log offset. We can > > just > > > > > > look at > > > > > > > > an > > > > > > > > > > > > > > in-memory hash table and see what the latest > > number is, > > > > > > and add > > > > > > > > > > one, to > > > > > > > > > > > > > > generate a new broker epoch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. How will producer registration happen? I am > > > > assuming > > > > > > it > > > > > > > > > > should be > > > > > > > > > > > > > > > similar to broker registration, with a similar > > way to > > > > > > > > generate > > > > > > > > > > > > producer > > > > > > > > > > > > > > id. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For the EOS stuff, we will need a few new RPCs to > > the > > > > > > > > controller. > > > > > > > > > > I > > > > > > > > > > > > think > > > > > > > > > > > > > > we should do that in a follow-on KIP, though, since > > > > this > > > > > > one is > > > > > > > > > > already > > > > > > > > > > > > > > pretty big. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. Because we expose Raft log to all the > > brokers, any > > > > > > > > > > de-duplication > > > > > > > > > > > > of > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > broker needs to happen before the requests are > > > > proposed > > > > > > to > > > > > > > > Raft. > > > > > > > > > > For > > > > > > > > > > > > this > > > > > > > > > > > > > > > the controller needs to be single threaded, and > > > > should do > > > > > > > > > > validation > > > > > > > > > > > > > > > against the in-process or pending requests and > > the > > > > final > > > > > > > > state. I > > > > > > > > > > > > read a > > > > > > > > > > > > > > > mention of this, in the responses in this > > > > thread.Will it > > > > > > be > > > > > > > > > > useful to > > > > > > > > > > > > > > > mention this in the KIP? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm not sure what you mean by "de-duplication of > > the > > > > > > broker." > > > > > > > > Can > > > > > > > > > > you > > > > > > > > > > > > > > give a little more context? > > > > > > > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Unmesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Aug 29, 2020 at 4:50 AM Colin McCabe < > > > > > > > > cmcc...@apache.org > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm thinking of calling a vote on KIP-631 on > > > > Monday. > > > > > > Let > > > > > > > > me > > > > > > > > > > know > > > > > > > > > > > > if > > > > > > > > > > > > > > > > there's any more comments I should address > > before I > > > > > > start > > > > > > > > the > > > > > > > > > > vote. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > cheers, > > > > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 11, 2020, at 05:39, Unmesh Joshi > > wrote: > > > > > > > > > > > > > > > > > >>Hi Unmesh, > > > > > > > > > > > > > > > > > >>Thanks, I'll take a look. > > > > > > > > > > > > > > > > > Thanks. I will be adding more to the > > prototype > > > > and > > > > > > will > > > > > > > > be > > > > > > > > > > happy > > > > > > > > > > > > to > > > > > > > > > > > > > > help > > > > > > > > > > > > > > > > > and collaborate. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > Unmesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 11, 2020 at 12:28 AM Colin > > McCabe < > > > > > > > > > > > > cmcc...@apache.org> > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jose, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > That'a s good point that I hadn't > > considered. > > > > It's > > > > > > > > > > probably > > > > > > > > > > > > worth > > > > > > > > > > > > > > > > having > > > > > > > > > > > > > > > > > > a separate leader change message, as you > > > > mentioned. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, I'll take a look. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 7, 2020, at 11:56, Jose Garcia > > > > Sancio > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Very cool prototype! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Colin, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The KIP proposes a record called > > IsrChange > > > > which > > > > > > > > > > includes the > > > > > > > > > > > > > > > > > > > partition, topic, isr, leader and leader > > > > epoch. > > > > > > > > During > > > > > > > > > > normal > > > > > > > > > > > > > > > > > > > operation ISR changes do not result in > > leader > > > > > > > > changes. > > > > > > > > > > > > Similarly, > > > > > > > > > > > > > > > > > > > leader changes do not necessarily > > involve ISR > > > > > > > > changes. > > > > > > > > > > The > > > > > > > > > > > > > > controller > > > > > > > > > > > > > > > > > > > implementation that uses ZK modeled them > > > > together > > > > > > > > because > > > > > > > > > > > > > > > > > > > 1. All of this information is stored in > > one > > > > > > znode. > > > > > > > > > > > > > > > > > > > 2. ZK's optimistic lock requires that you > > > > specify > > > > > > > > the new > > > > > > > > > > > > value > > > > > > > > > > > > > > > > > > completely > > > > > > > > > > > > > > > > > > > 3. The change to that znode was being > > > > performed > > > > > > by > > > > > > > > both > > > > > > > > > > the > > > > > > > > > > > > > > > > controller > > > > > > > > > > > > > > > > > > > and the leader. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > None of these reasons are true in > > KIP-500. > > > > Have > > > > > > we > > > > > > > > > > considered > > > > > > > > > > > > > > having > > > > > > > > > > > > > > > > > > > two different records? For example > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. IsrChange record which includes topic, > > > > > > partition, > > > > > > > > isr > > > > > > > > > > > > > > > > > > > 2. LeaderChange record which includes > > topic, > > > > > > > > partition, > > > > > > > > > > > > leader > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > leader epoch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I suspect that making this change will > > also > > > > > > require > > > > > > > > > > changing > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > message AlterIsrRequest introduced in > > > > KIP-497: > > > > > > Add > > > > > > > > > > > > inter-broker > > > > > > > > > > > > > > API > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > alter ISR. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > -Jose > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >