Hey Jun, Motivated by your suggestion, I think we can also store the information of created replicas in per-broker znode at /brokers/created_replicas/ids/[id]. Does this sound good?
Regards, Dong On Tue, Feb 21, 2017 at 2:37 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > Thanks much for your comments. > > I actually proposed the design to store both offline replicas and created > replicas in per-broker znode before switching to the design in the current > KIP. The current design stores created replicas in per-partition znode and > transmits offline replicas via LeaderAndIsrResponse. The original solution > is roughly the same as what you suggested. The advantage of the current > solution is kind of philosophical: 1) we want to transmit data (e.g. > offline replicas) using RPC and reduce dependency on zookeeper; 2) we want > controller to be the only one that determines any state (e.g. offline > replicas) that will be exposed to user. The advantage of the solution to > store offline replica in zookeeper is that we can save one roundtrip time > for controller to handle log directory failure. However, this extra > roundtrip time should not be a big deal since the log directory failure is > rare and inefficiency of extra latency is less of a problem when there is > log directory failure. > > Do you think the two philosophical advantages of the current KIP make > sense? If not, then I can switch to the original design that stores offline > replicas in zookeeper. It is actually written already. One disadvantage is > that we have to make non-trivial change the KIP (e.g. no create flag in > LeaderAndIsrRequest and no created flag zookeeper) and restart this KIP > discussion. > > Regarding 21, it seems to me that LeaderAndIsrRequest/StopReplicaRequest > only makes sense when broker can make the choice (e.g. fetch data for this > replica or not). In the case that the log directory of the replica is > already offline, broker have to stop fetching data for this replica > regardless of what controller tells it to do. Thus it seems cleaner for > broker to stop fetch data for this replica immediately. The advantage of > this solution is that the controller logic is simpler since it doesn't need > to send StopReplicaRequest in case of log directory failure, and the log4j > log is also cleaner. Is there specific advantage of having controller send > tells broker to stop fetching data for offline replicas? > > Regarding 22, I agree with your observation that it will happen. I will > update the KIP and specify that broker will exist with proper error message > in the log and user needs to manually remove partitions and restart the > broker. > > Thanks! > Dong > > > > On Mon, Feb 20, 2017 at 10:17 PM, Jun Rao <j...@confluent.io> wrote: > >> Hi, Dong, >> >> Sorry for the delay. A few more comments. >> >> 20. One complexity that I found in the current KIP is that the way the >> broker communicates failed replicas to the controller is inefficient. When >> a log directory fails, the broker only sends an indication through ZK to >> the controller and the controller has to issue a LeaderAndIsrRequest to >> discover which replicas are offline due to log directory failure. An >> alternative approach is that when a log directory fails, the broker just >> writes the failed the directory and the corresponding topic partitions in >> a >> new failed log directory ZK path like the following. >> >> Failed log directory path: >> /brokers/ids/[brokerId]/failed-log-directory/directory1 => { json of the >> topic partitions in the log directory }. >> >> The controller just watches for child changes in >> /brokers/ids/[brokerId]/failed-log-directory. >> After reading this path, the broker knows the exact set of replicas that >> are offline and can trigger that replica state change accordingly. This >> saves an extra round of LeaderAndIsrRequest handling. >> >> With this new ZK path, we get probably get rid of/broker/topics/[topic]/ >> partitions/[partitionId]/controller_managed_state. The creation of a new >> replica is expected to always succeed unless all log directories fail, in >> which case, the broker goes down anyway. Then, during controller failover, >> the controller just needs to additionally read from ZK the extra failed >> log >> directory paths, which is many fewer than topics or partitions. >> >> On broker startup, if a log directory becomes available, the corresponding >> log directory path in ZK will be removed. >> >> The downside of this approach is that the value of this new ZK path can be >> large. However, even with 5K partition per log directory and 100 bytes per >> partition, the size of the value is 500KB, still less than the default 1MB >> znode limit in ZK. >> >> 21. "Broker will remove offline replica from its replica fetcher threads." >> The proposal lets the broker remove the replica from the replica fetcher >> thread when it detects a directory failure. An alternative is to only do >> that until the broker receives the LeaderAndIsrRequest/StopReplic >> aRequest. >> The benefit of this is that the controller is the only one who decides >> which replica to be removed from the replica fetcher threads. The broker >> also doesn't need additional logic to remove the replica from replica >> fetcher threads. The downside is that in a small window, the replica fetch >> thread will keep writing to the failed log directory and may pollute the >> log4j log. >> >> 22. In the current design, there is a potential corner case issue that the >> same partition may exist in more than one log directory at some point. >> Consider the following steps: (1) a new topic t1 is created and the >> controller sends LeaderAndIsrRequest to a broker; (2) the broker creates >> partition t1-p1 in log dir1; (3) before the broker sends a response, it >> goes down; (4) the broker is restarted with log dir1 unreadable; (5) the >> broker receives a new LeaderAndIsrRequest and creates partition t1-p1 on >> log dir2; (6) at some point, the broker is restarted with log dir1 fixed. >> Now partition t1-p1 is in two log dirs. The alternative approach that I >> suggested above may suffer from a similar corner case issue. Since this is >> rare, if the broker detects this during broker startup, it can probably >> just log an error and exit. The admin can remove the redundant partitions >> manually and then restart the broker. >> >> Thanks, >> >> Jun >> >> On Sat, Feb 18, 2017 at 9:31 PM, Dong Lin <lindon...@gmail.com> wrote: >> >> > Hey Jun, >> > >> > Could you please let me know if the solutions above could address your >> > concern? I really want to move the discussion forward. >> > >> > Thanks, >> > Dong >> > >> > >> > On Tue, Feb 14, 2017 at 8:17 PM, Dong Lin <lindon...@gmail.com> wrote: >> > >> > > Hey Jun, >> > > >> > > Thanks for all your help and time to discuss this KIP. When you get >> the >> > > time, could you let me know if the previous answers address the >> concern? >> > > >> > > I think the more interesting question in your last email is where we >> > > should store the "created" flag in ZK. I proposed the solution that I >> > like >> > > most, i.e. store it together with the replica assignment data in the >> > /brokers/topics/[topic]. >> > > In order to expedite discussion, let me provide another two ideas to >> > > address the concern just in case the first idea doesn't work: >> > > >> > > - We can avoid extra controller ZK read when there is no disk failure >> > > (95% of time?). When controller starts, it doesn't >> > > read controller_managed_state in ZK and sends LeaderAndIsrRequest with >> > > "create = false". Only if LeaderAndIsrResponse shows failure for any >> > > replica, then controller will read controller_managed_state for this >> > > partition and re-send LeaderAndIsrRequset with "create=true" if this >> > > replica has not been created. >> > > >> > > - We can significantly reduce this ZK read time by making >> > > controller_managed_state a topic level information in ZK, e.g. >> > > /brokers/topics/[topic]/state. Given that most topic has 10+ >> partition, >> > > the extra ZK read time should be less than 10% of the existing total >> zk >> > > read time during controller failover. >> > > >> > > Thanks! >> > > Dong >> > > >> > > >> > > On Tue, Feb 14, 2017 at 7:30 AM, Dong Lin <lindon...@gmail.com> >> wrote: >> > > >> > >> Hey Jun, >> > >> >> > >> I just realized that you may be suggesting that a tool for listing >> > >> offline directories is necessary for KIP-112 by asking whether >> KIP-112 >> > and >> > >> KIP-113 will be in the same release. I think such a tool is useful >> but >> > >> doesn't have to be included in KIP-112. This is because as of now >> admin >> > >> needs to log into broker machine and check broker log to figure out >> the >> > >> cause of broker failure and the bad log directory in case of disk >> > failure. >> > >> The KIP-112 won't make it harder since admin can still figure out the >> > bad >> > >> log directory by doing the same thing. Thus it is probably OK to just >> > >> include this script in KIP-113. Regardless, my hope is to finish both >> > KIPs >> > >> ASAP and make them in the same release since both KIPs are needed for >> > the >> > >> JBOD setup. >> > >> >> > >> Thanks, >> > >> Dong >> > >> >> > >> On Mon, Feb 13, 2017 at 5:52 PM, Dong Lin <lindon...@gmail.com> >> wrote: >> > >> >> > >>> And the test plan has also been updated to simulate disk failure by >> > >>> changing log directory permission to 000. >> > >>> >> > >>> On Mon, Feb 13, 2017 at 5:50 PM, Dong Lin <lindon...@gmail.com> >> wrote: >> > >>> >> > >>>> Hi Jun, >> > >>>> >> > >>>> Thanks for the reply. These comments are very helpful. Let me >> answer >> > >>>> them inline. >> > >>>> >> > >>>> >> > >>>> On Mon, Feb 13, 2017 at 3:25 PM, Jun Rao <j...@confluent.io> wrote: >> > >>>> >> > >>>>> Hi, Dong, >> > >>>>> >> > >>>>> Thanks for the reply. A few more replies and new comments below. >> > >>>>> >> > >>>>> On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin <lindon...@gmail.com> >> > wrote: >> > >>>>> >> > >>>>> > Hi Jun, >> > >>>>> > >> > >>>>> > Thanks for the detailed comments. Please see answers inline: >> > >>>>> > >> > >>>>> > On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao <j...@confluent.io> >> wrote: >> > >>>>> > >> > >>>>> > > Hi, Dong, >> > >>>>> > > >> > >>>>> > > Thanks for the updated wiki. A few comments below. >> > >>>>> > > >> > >>>>> > > 1. Topics get created >> > >>>>> > > 1.1 Instead of storing successfully created replicas in ZK, >> could >> > >>>>> we >> > >>>>> > store >> > >>>>> > > unsuccessfully created replicas in ZK? Since the latter is >> less >> > >>>>> common, >> > >>>>> > it >> > >>>>> > > probably reduces the load on ZK. >> > >>>>> > > >> > >>>>> > >> > >>>>> > We can store unsuccessfully created replicas in ZK. But I am not >> > >>>>> sure if >> > >>>>> > that can reduce write load on ZK. >> > >>>>> > >> > >>>>> > If we want to reduce write load on ZK using by store >> unsuccessfully >> > >>>>> created >> > >>>>> > replicas in ZK, then broker should not write to ZK if all >> replicas >> > >>>>> are >> > >>>>> > successfully created. It means that if >> > /broker/topics/[topic]/partiti >> > >>>>> > ons/[partitionId]/controller_managed_state doesn't exist in ZK >> for >> > >>>>> a given >> > >>>>> > partition, we have to assume all replicas of this partition have >> > been >> > >>>>> > successfully created and send LeaderAndIsrRequest with create = >> > >>>>> false. This >> > >>>>> > becomes a problem if controller crashes before receiving >> > >>>>> > LeaderAndIsrResponse to validate whether a replica has been >> > created. >> > >>>>> > >> > >>>>> > I think this approach and reduce the number of bytes stored in >> ZK. >> > >>>>> But I am >> > >>>>> > not sure if this is a concern. >> > >>>>> > >> > >>>>> > >> > >>>>> > >> > >>>>> I was mostly concerned about the controller failover time. >> Currently, >> > >>>>> the >> > >>>>> controller failover is likely dominated by the cost of reading >> > >>>>> topic/partition level information from ZK. If we add another >> > partition >> > >>>>> level path in ZK, it probably will double the controller failover >> > >>>>> time. If >> > >>>>> the approach of representing the non-created replicas doesn't >> work, >> > >>>>> have >> > >>>>> you considered just adding the created flag in the leaderAndIsr >> path >> > >>>>> in ZK? >> > >>>>> >> > >>>>> >> > >>>> Yes, I have considered adding the created flag in the leaderAndIsr >> > path >> > >>>> in ZK. If we were to add created flag per replica in the >> > >>>> LeaderAndIsrRequest, then it requires a lot of change in the code >> > base. >> > >>>> >> > >>>> If we don't add created flag per replica in the >> LeaderAndIsrRequest, >> > >>>> then the information in leaderAndIsr path in ZK and >> > LeaderAndIsrRequest >> > >>>> would be different. Further, the procedure for broker to update ISR >> > in ZK >> > >>>> will be a bit complicated. When leader updates leaderAndIsr path in >> > ZK, it >> > >>>> will have to first read created flags from ZK, change isr, and >> write >> > >>>> leaderAndIsr back to ZK. And it needs to check znode version and >> > re-try >> > >>>> write operation in ZK if controller has updated ZK during this >> > period. This >> > >>>> is in contrast to the current implementation where the leader >> either >> > gets >> > >>>> all the information from LeaderAndIsrRequest sent by controller, or >> > >>>> determine the infromation by itself (e.g. ISR), before writing to >> > >>>> leaderAndIsr path in ZK. >> > >>>> >> > >>>> It seems to me that the above solution is a bit complicated and not >> > >>>> clean. Thus I come up with the design in this KIP to store this >> > created >> > >>>> flag in a separate zk path. The path is named >> > controller_managed_state to >> > >>>> indicate that we can store in this znode all information that is >> > managed by >> > >>>> controller only, as opposed to ISR. >> > >>>> >> > >>>> I agree with your concern of increased ZK read time during >> controller >> > >>>> failover. How about we store the "created" information in the >> > >>>> znode /brokers/topics/[topic]? We can change that znode to have the >> > >>>> following data format: >> > >>>> >> > >>>> { >> > >>>> "version" : 2, >> > >>>> "created" : { >> > >>>> "1" : [1, 2, 3], >> > >>>> ... >> > >>>> } >> > >>>> "partition" : { >> > >>>> "1" : [1, 2, 3], >> > >>>> ... >> > >>>> } >> > >>>> } >> > >>>> >> > >>>> We won't have extra zk read using this solution. It also seems >> > >>>> reasonable to put the partition assignment information together >> with >> > >>>> replica creation information. The latter is only changed once after >> > the >> > >>>> partition is created or re-assigned. >> > >>>> >> > >>>> >> > >>>>> >> > >>>>> >> > >>>>> > >> > >>>>> > > 1.2 If an error is received for a follower, does the >> controller >> > >>>>> eagerly >> > >>>>> > > remove it from ISR or do we just let the leader removes it >> after >> > >>>>> timeout? >> > >>>>> > > >> > >>>>> > >> > >>>>> > No, Controller will not actively remove it from ISR. But >> controller >> > >>>>> will >> > >>>>> > recognize it as offline replica and propagate this information >> to >> > all >> > >>>>> > brokers via UpdateMetadataRequest. Each leader can use this >> > >>>>> information to >> > >>>>> > actively remove offline replica from ISR set. I have updated to >> > wiki >> > >>>>> to >> > >>>>> > clarify it. >> > >>>>> > >> > >>>>> > >> > >>>>> >> > >>>>> That seems inconsistent with how the controller deals with offline >> > >>>>> replicas >> > >>>>> due to broker failures. When that happens, the broker will (1) >> select >> > >>>>> a new >> > >>>>> leader if the offline replica is the leader; (2) remove the >> replica >> > >>>>> from >> > >>>>> ISR if the offline replica is the follower. So, intuitively, it >> seems >> > >>>>> that >> > >>>>> we should be doing the same thing when dealing with offline >> replicas >> > >>>>> due to >> > >>>>> disk failure. >> > >>>>> >> > >>>> >> > >>>> My bad. I misunderstand how the controller currently handles broker >> > >>>> failure and ISR change. Yes we should do the same thing when >> dealing >> > with >> > >>>> offline replicas here. I have updated the KIP to specify that, >> when an >> > >>>> offline replica is discovered by controller, the controller removes >> > offline >> > >>>> replicas from ISR in the ZK and sends LeaderAndIsrRequest with >> > updated ISR >> > >>>> to be used by partition leaders. >> > >>>> >> > >>>> >> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> > >> > >>>>> > > 1.3 Similar, if an error is received for a leader, should the >> > >>>>> controller >> > >>>>> > > trigger leader election again? >> > >>>>> > > >> > >>>>> > >> > >>>>> > Yes, controller will trigger leader election if leader replica >> is >> > >>>>> offline. >> > >>>>> > I have updated the wiki to clarify it. >> > >>>>> > >> > >>>>> > >> > >>>>> > > >> > >>>>> > > 2. A log directory stops working on a broker during runtime: >> > >>>>> > > 2.1 It seems the broker remembers the failed directory after >> > >>>>> hitting an >> > >>>>> > > IOException and the failed directory won't be used for >> creating >> > new >> > >>>>> > > partitions until the broker is restarted? If so, could you add >> > >>>>> that to >> > >>>>> > the >> > >>>>> > > wiki. >> > >>>>> > > >> > >>>>> > >> > >>>>> > Right, broker assumes a log directory to be good after it >> starts, >> > >>>>> and mark >> > >>>>> > log directory as bad once there is IOException when broker >> attempts >> > >>>>> to >> > >>>>> > access the log directory. New replicas will only be created on >> good >> > >>>>> log >> > >>>>> > directory. I just added this to the KIP. >> > >>>>> > >> > >>>>> > >> > >>>>> > > 2.2 Could you be a bit more specific on how and during which >> > >>>>> operation >> > >>>>> > the >> > >>>>> > > broker detects directory failure? Is it when the broker hits >> an >> > >>>>> > IOException >> > >>>>> > > during writes, or both reads and writes? For example, during >> > >>>>> broker >> > >>>>> > > startup, it only reads from each of the log directories, if it >> > >>>>> hits an >> > >>>>> > > IOException there, does the broker immediately mark the >> directory >> > >>>>> as >> > >>>>> > > offline? >> > >>>>> > > >> > >>>>> > >> > >>>>> > Broker marks log directory as bad once there is IOException when >> > >>>>> broker >> > >>>>> > attempts to access the log directory. This includes read and >> write. >> > >>>>> These >> > >>>>> > operations include log append, log read, log cleaning, watermark >> > >>>>> checkpoint >> > >>>>> > etc. If broker hits IOException when it reads from each of the >> log >> > >>>>> > directory during startup, it immediately mark the directory as >> > >>>>> offline. >> > >>>>> > >> > >>>>> > I just updated the KIP to clarify it. >> > >>>>> > >> > >>>>> > >> > >>>>> > > 3. Partition reassignment: If we know a replica is offline, >> do we >> > >>>>> still >> > >>>>> > > want to send StopReplicaRequest to it? >> > >>>>> > > >> > >>>>> > >> > >>>>> > No, controller doesn't send StopReplicaRequest for an offline >> > >>>>> replica. >> > >>>>> > Controller treats this scenario in the same way that exiting >> Kafka >> > >>>>> > implementation does when the broker of this replica is offline. >> > >>>>> > >> > >>>>> > >> > >>>>> > > >> > >>>>> > > 4. UpdateMetadataRequestPartitionState: For >> offline_replicas, do >> > >>>>> they >> > >>>>> > only >> > >>>>> > > include offline replicas due to log directory failures or do >> they >> > >>>>> also >> > >>>>> > > include offline replicas due to broker failure? >> > >>>>> > > >> > >>>>> > >> > >>>>> > UpdateMetadataRequestPartitionState's offline_replicas include >> > >>>>> offline >> > >>>>> > replicas due to both log directory failure and broker failure. >> This >> > >>>>> is to >> > >>>>> > make the semantics of this field easier to understand. Broker >> can >> > >>>>> > distinguish whether a replica is offline due to broker failure >> or >> > >>>>> disk >> > >>>>> > failure by checking whether a broker is live in the >> > >>>>> UpdateMetadataRequest. >> > >>>>> > >> > >>>>> > >> > >>>>> > > >> > >>>>> > > 5. Tools: Could we add some kind of support in the tool to >> list >> > >>>>> offline >> > >>>>> > > directories? >> > >>>>> > > >> > >>>>> > >> > >>>>> > In KIP-112 we don't have tools to list offline directories >> because >> > >>>>> we have >> > >>>>> > intentionally avoided exposing log directory information (e.g. >> log >> > >>>>> > directory path) to user or other brokers. I think we can add >> this >> > >>>>> feature >> > >>>>> > in KIP-113, in which we will have DescribeDirsRequest to list >> log >> > >>>>> directory >> > >>>>> > information (e.g. partition assignment, path, size) needed for >> > >>>>> rebalance. >> > >>>>> > >> > >>>>> > >> > >>>>> Since we are introducing a new failure mode, if a replica becomes >> > >>>>> offline >> > >>>>> due to failure in log directories, the first thing an admin wants >> to >> > >>>>> know >> > >>>>> is which log directories are offline from the broker's >> perspective. >> > >>>>> So, >> > >>>>> including such a tool will be useful. Do you plan to do KIP-112 >> and >> > >>>>> KIP-113 >> > >>>>> in the same release? >> > >>>>> >> > >>>>> >> > >>>> Yes, I agree that including such a tool is using. This is probably >> > >>>> better to be added in KIP-113 because we need DescribeDirsRequest >> to >> > get >> > >>>> this information. I will update KIP-113 to include this tool. >> > >>>> >> > >>>> I plan to do KIP-112 and KIP-113 separately to make each KIP and >> their >> > >>>> patch easier to review. I don't have any plan about which release >> to >> > have >> > >>>> these KIPs. My plan is to both of them ASAP. Is there particular >> > timeline >> > >>>> you prefer for code of these two KIPs to checked-in? >> > >>>> >> > >>>> >> > >>>>> > >> > >>>>> > > >> > >>>>> > > 6. Metrics: Could we add some metrics to show offline >> > directories? >> > >>>>> > > >> > >>>>> > >> > >>>>> > Sure. I think it makes sense to have each broker report its >> number >> > of >> > >>>>> > offline replicas and offline log directories. The previous >> metric >> > >>>>> was put >> > >>>>> > in KIP-113. I just added both metrics in KIP-112. >> > >>>>> > >> > >>>>> > >> > >>>>> > > >> > >>>>> > > 7. There are still references to kafka-log-dirs.sh. Are they >> > valid? >> > >>>>> > > >> > >>>>> > >> > >>>>> > My bad. I just removed this from "Changes in Operational >> > Procedures" >> > >>>>> and >> > >>>>> > "Test Plan" in the KIP. >> > >>>>> > >> > >>>>> > >> > >>>>> > > >> > >>>>> > > 8. Do you think KIP-113 is ready for review? One thing that >> > KIP-113 >> > >>>>> > > mentions during partition reassignment is to first send >> > >>>>> > > LeaderAndIsrRequest, followed by ChangeReplicaDirRequest. It >> > seems >> > >>>>> it's >> > >>>>> > > better if the replicas are created in the right log directory >> in >> > >>>>> the >> > >>>>> > first >> > >>>>> > > place? The reason that I brought it up here is because it may >> > >>>>> affect the >> > >>>>> > > protocol of LeaderAndIsrRequest. >> > >>>>> > > >> > >>>>> > >> > >>>>> > Yes, KIP-113 is ready for review. The advantage of the current >> > >>>>> design is >> > >>>>> > that we can keep LeaderAndIsrRequest log-direcotry-agnostic. The >> > >>>>> > implementation would be much easier to read if all log related >> > logic >> > >>>>> (e.g. >> > >>>>> > various errors) are put in ChangeReplicadIRrequest and the code >> > path >> > >>>>> of >> > >>>>> > handling replica movement is separated from leadership handling. >> > >>>>> > >> > >>>>> > In other words, I think Kafka may be easier to develop in the >> long >> > >>>>> term if >> > >>>>> > we separate these two requests. >> > >>>>> > >> > >>>>> > I agree that ideally we want to create replicas in the right log >> > >>>>> directory >> > >>>>> > in the first place. But I am not sure if there is any >> performance >> > or >> > >>>>> > correctness concern with the existing way of moving it after it >> is >> > >>>>> created. >> > >>>>> > Besides, does this decision affect the change proposed in >> KIP-112? >> > >>>>> > >> > >>>>> > >> > >>>>> I am just wondering if you have considered including the log >> > directory >> > >>>>> for >> > >>>>> the replicas in the LeaderAndIsrRequest. >> > >>>>> >> > >>>>> >> > >>>> Yeah I have thought about this idea, but only briefly. I rejected >> this >> > >>>> idea because log directory is broker's local information and I >> prefer >> > not >> > >>>> to expose local config information to the cluster through >> > >>>> LeaderAndIsrRequest. >> > >>>> >> > >>>> >> > >>>>> 9. Could you describe when the offline replicas due to log >> directory >> > >>>>> failure are removed from the replica fetch threads? >> > >>>>> >> > >>>> >> > >>>> Yes. If the offline replica was a leader, either a new leader is >> > >>>> elected or all follower brokers will stop fetching for this >> > partition. If >> > >>>> the offline replica is a follower, the broker will stop fetching >> for >> > this >> > >>>> replica immediately. A broker stops fetching data for a replica by >> > removing >> > >>>> the replica from the replica fetch threads. I have updated the KIP >> to >> > >>>> clarify it. >> > >>>> >> > >>>> >> > >>>>> >> > >>>>> 10. The wiki mentioned changing the log directory to a file for >> > >>>>> simulating >> > >>>>> disk failure in system tests. Could we just change the permission >> of >> > >>>>> the >> > >>>>> log directory to 000 to simulate that? >> > >>>>> >> > >>>> >> > >>>> >> > >>>> Sure, >> > >>>> >> > >>>> >> > >>>>> >> > >>>>> Thanks, >> > >>>>> >> > >>>>> Jun >> > >>>>> >> > >>>>> >> > >>>>> > > Jun >> > >>>>> > > >> > >>>>> > > On Fri, Feb 10, 2017 at 9:53 AM, Dong Lin < >> lindon...@gmail.com> >> > >>>>> wrote: >> > >>>>> > > >> > >>>>> > > > Hi Jun, >> > >>>>> > > > >> > >>>>> > > > Can I replace zookeeper access with direct RPC for both ISR >> > >>>>> > notification >> > >>>>> > > > and disk failure notification in a future KIP, or do you >> feel >> > we >> > >>>>> should >> > >>>>> > > do >> > >>>>> > > > it in this KIP? >> > >>>>> > > > >> > >>>>> > > > Hi Eno, Grant and everyone, >> > >>>>> > > > >> > >>>>> > > > Is there further improvement you would like to see with this >> > KIP? >> > >>>>> > > > >> > >>>>> > > > Thanks you all for the comments, >> > >>>>> > > > >> > >>>>> > > > Dong >> > >>>>> > > > >> > >>>>> > > > >> > >>>>> > > > >> > >>>>> > > > On Thu, Feb 9, 2017 at 4:45 PM, Dong Lin < >> lindon...@gmail.com> >> > >>>>> wrote: >> > >>>>> > > > >> > >>>>> > > > > >> > >>>>> > > > > >> > >>>>> > > > > On Thu, Feb 9, 2017 at 3:37 PM, Colin McCabe < >> > >>>>> cmcc...@apache.org> >> > >>>>> > > wrote: >> > >>>>> > > > > >> > >>>>> > > > >> On Thu, Feb 9, 2017, at 11:40, Dong Lin wrote: >> > >>>>> > > > >> > Thanks for all the comments Colin! >> > >>>>> > > > >> > >> > >>>>> > > > >> > To answer your questions: >> > >>>>> > > > >> > - Yes, a broker will shutdown if all its log >> directories >> > >>>>> are bad. >> > >>>>> > > > >> >> > >>>>> > > > >> That makes sense. Can you add this to the writeup? >> > >>>>> > > > >> >> > >>>>> > > > > >> > >>>>> > > > > Sure. This has already been added. You can find it here >> > >>>>> > > > > <https://cwiki.apache.org/conf >> luence/pages/diffpagesbyversio >> > >>>>> n.action >> > >>>>> > ? >> > >>>>> > > > pageId=67638402&selectedPageVe >> rsions=9&selectedPageVersions= >> > 10> >> > >>>>> > > > > . >> > >>>>> > > > > >> > >>>>> > > > > >> > >>>>> > > > >> >> > >>>>> > > > >> > - I updated the KIP to explicitly state that a log >> > >>>>> directory will >> > >>>>> > be >> > >>>>> > > > >> > assumed to be good until broker sees IOException when >> it >> > >>>>> tries to >> > >>>>> > > > access >> > >>>>> > > > >> > the log directory. >> > >>>>> > > > >> >> > >>>>> > > > >> Thanks. >> > >>>>> > > > >> >> > >>>>> > > > >> > - Controller doesn't explicitly know whether there is >> new >> > >>>>> log >> > >>>>> > > > directory >> > >>>>> > > > >> > or >> > >>>>> > > > >> > not. All controller knows is whether replicas are >> online >> > or >> > >>>>> > offline >> > >>>>> > > > >> based >> > >>>>> > > > >> > on LeaderAndIsrResponse. According to the existing >> Kafka >> > >>>>> > > > implementation, >> > >>>>> > > > >> > controller will always send LeaderAndIsrRequest to a >> > broker >> > >>>>> after >> > >>>>> > it >> > >>>>> > > > >> > bounces. >> > >>>>> > > > >> >> > >>>>> > > > >> I thought so. It's good to clarify, though. Do you >> think >> > >>>>> it's >> > >>>>> > worth >> > >>>>> > > > >> adding a quick discussion of this on the wiki? >> > >>>>> > > > >> >> > >>>>> > > > > >> > >>>>> > > > > Personally I don't think it is needed. If broker starts >> with >> > >>>>> no bad >> > >>>>> > log >> > >>>>> > > > > directory, everything should work it is and we should not >> > need >> > >>>>> to >> > >>>>> > > clarify >> > >>>>> > > > > it. The KIP has already covered the scenario when a broker >> > >>>>> starts >> > >>>>> > with >> > >>>>> > > > bad >> > >>>>> > > > > log directory. Also, the KIP doesn't claim or hint that we >> > >>>>> support >> > >>>>> > > > dynamic >> > >>>>> > > > > addition of new log directories. I think we are good. >> > >>>>> > > > > >> > >>>>> > > > > >> > >>>>> > > > >> best, >> > >>>>> > > > >> Colin >> > >>>>> > > > >> >> > >>>>> > > > >> > >> > >>>>> > > > >> > Please see this >> > >>>>> > > > >> > <https://cwiki.apache.org/conf >> > >>>>> luence/pages/diffpagesbyversio >> > >>>>> > > > >> n.action?pageId=67638402&selectedPageVersions=9& >> > >>>>> > > > selectedPageVersions=10> >> > >>>>> > > > >> > for the change of the KIP. >> > >>>>> > > > >> > >> > >>>>> > > > >> > On Thu, Feb 9, 2017 at 11:04 AM, Colin McCabe < >> > >>>>> cmcc...@apache.org >> > >>>>> > > >> > >>>>> > > > >> wrote: >> > >>>>> > > > >> > >> > >>>>> > > > >> > > On Thu, Feb 9, 2017, at 11:03, Colin McCabe wrote: >> > >>>>> > > > >> > > > Thanks, Dong L. >> > >>>>> > > > >> > > > >> > >>>>> > > > >> > > > Do we plan on bringing down the broker process when >> > all >> > >>>>> log >> > >>>>> > > > >> directories >> > >>>>> > > > >> > > > are offline? >> > >>>>> > > > >> > > > >> > >>>>> > > > >> > > > Can you explicitly state on the KIP that the log >> dirs >> > >>>>> are all >> > >>>>> > > > >> considered >> > >>>>> > > > >> > > > good after the broker process is bounced? It seems >> > >>>>> like an >> > >>>>> > > > >> important >> > >>>>> > > > >> > > > thing to be clear about. Also, perhaps discuss how >> > the >> > >>>>> > > controller >> > >>>>> > > > >> > > > becomes aware of the newly good log directories >> after >> > a >> > >>>>> broker >> > >>>>> > > > >> bounce >> > >>>>> > > > >> > > > (and whether this triggers re-election). >> > >>>>> > > > >> > > >> > >>>>> > > > >> > > I meant to write, all the log dirs where the broker >> can >> > >>>>> still >> > >>>>> > read >> > >>>>> > > > the >> > >>>>> > > > >> > > index and some other files. Clearly, log dirs that >> are >> > >>>>> > completely >> > >>>>> > > > >> > > inaccessible will still be considered bad after a >> broker >> > >>>>> process >> > >>>>> > > > >> bounce. >> > >>>>> > > > >> > > >> > >>>>> > > > >> > > best, >> > >>>>> > > > >> > > Colin >> > >>>>> > > > >> > > >> > >>>>> > > > >> > > > >> > >>>>> > > > >> > > > +1 (non-binding) aside from that >> > >>>>> > > > >> > > > >> > >>>>> > > > >> > > > >> > >>>>> > > > >> > > > >> > >>>>> > > > >> > > > On Wed, Feb 8, 2017, at 00:47, Dong Lin wrote: >> > >>>>> > > > >> > > > > Hi all, >> > >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > Thank you all for the helpful suggestion. I have >> > >>>>> updated the >> > >>>>> > > KIP >> > >>>>> > > > >> to >> > >>>>> > > > >> > > > > address >> > >>>>> > > > >> > > > > the comments received so far. See here >> > >>>>> > > > >> > > > > <https://cwiki.apache.org/conf >> > >>>>> > luence/pages/diffpagesbyversio >> > >>>>> > > > >> n.action? >> > >>>>> > > > >> > > pageId=67638402&selectedPageVe >> > >>>>> rsions=8&selectedPageVersions= >> > >>>>> > 9>to >> > >>>>> > > > >> > > > > read the changes of the KIP. Here is a summary of >> > >>>>> change: >> > >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > - Updated the Proposed Change section to change >> the >> > >>>>> recovery >> > >>>>> > > > >> steps. >> > >>>>> > > > >> > > After >> > >>>>> > > > >> > > > > this change, broker will also create replica as >> long >> > >>>>> as all >> > >>>>> > > log >> > >>>>> > > > >> > > > > directories >> > >>>>> > > > >> > > > > are working. >> > >>>>> > > > >> > > > > - Removed kafka-log-dirs.sh from this KIP since >> user >> > >>>>> no >> > >>>>> > longer >> > >>>>> > > > >> needs to >> > >>>>> > > > >> > > > > use >> > >>>>> > > > >> > > > > it for recovery from bad disks. >> > >>>>> > > > >> > > > > - Explained how the znode >> controller_managed_state >> > is >> > >>>>> > managed >> > >>>>> > > in >> > >>>>> > > > >> the >> > >>>>> > > > >> > > > > Public >> > >>>>> > > > >> > > > > interface section. >> > >>>>> > > > >> > > > > - Explained what happens during controller >> failover, >> > >>>>> > partition >> > >>>>> > > > >> > > > > reassignment >> > >>>>> > > > >> > > > > and topic deletion in the Proposed Change >> section. >> > >>>>> > > > >> > > > > - Updated Future Work section to include the >> > following >> > >>>>> > > potential >> > >>>>> > > > >> > > > > improvements >> > >>>>> > > > >> > > > > - Let broker notify controller of ISR change >> and >> > >>>>> disk >> > >>>>> > state >> > >>>>> > > > >> change >> > >>>>> > > > >> > > via >> > >>>>> > > > >> > > > > RPC instead of using zookeeper >> > >>>>> > > > >> > > > > - Handle various failure scenarios (e.g. slow >> > disk) >> > >>>>> on a >> > >>>>> > > > >> case-by-case >> > >>>>> > > > >> > > > > basis. For example, we may want to detect slow >> disk >> > >>>>> and >> > >>>>> > > consider >> > >>>>> > > > >> it as >> > >>>>> > > > >> > > > > offline. >> > >>>>> > > > >> > > > > - Allow admin to mark a directory as bad so >> that >> > it >> > >>>>> will >> > >>>>> > not >> > >>>>> > > > be >> > >>>>> > > > >> used. >> > >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > Thanks, >> > >>>>> > > > >> > > > > Dong >> > >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > On Tue, Feb 7, 2017 at 5:23 PM, Dong Lin < >> > >>>>> > lindon...@gmail.com >> > >>>>> > > > >> > >>>>> > > > >> wrote: >> > >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > > Hey Eno, >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > Thanks much for the comment! >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > I still think the complexity added to Kafka is >> > >>>>> justified >> > >>>>> > by >> > >>>>> > > > its >> > >>>>> > > > >> > > benefit. >> > >>>>> > > > >> > > > > > Let me provide my reasons below. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > 1) The additional logic is easy to understand >> and >> > >>>>> thus its >> > >>>>> > > > >> complexity >> > >>>>> > > > >> > > > > > should be reasonable. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > On the broker side, it needs to catch exception >> > when >> > >>>>> > access >> > >>>>> > > > log >> > >>>>> > > > >> > > directory, >> > >>>>> > > > >> > > > > > mark log directory and all its replicas as >> > offline, >> > >>>>> notify >> > >>>>> > > > >> > > controller by >> > >>>>> > > > >> > > > > > writing the zookeeper notification path, and >> > >>>>> specify error >> > >>>>> > > in >> > >>>>> > > > >> > > > > > LeaderAndIsrResponse. On the controller side, >> it >> > >>>>> will >> > >>>>> > > listener >> > >>>>> > > > >> to >> > >>>>> > > > >> > > > > > zookeeper for disk failure notification, learn >> > about >> > >>>>> > offline >> > >>>>> > > > >> > > replicas in >> > >>>>> > > > >> > > > > > the LeaderAndIsrResponse, and take offline >> > replicas >> > >>>>> into >> > >>>>> > > > >> > > consideration when >> > >>>>> > > > >> > > > > > electing leaders. It also mark replica as >> created >> > in >> > >>>>> > > zookeeper >> > >>>>> > > > >> and >> > >>>>> > > > >> > > use it >> > >>>>> > > > >> > > > > > to determine whether a replica is created. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > That is all the logic we need to add in Kafka. >> I >> > >>>>> > personally >> > >>>>> > > > feel >> > >>>>> > > > >> > > this is >> > >>>>> > > > >> > > > > > easy to reason about. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > 2) The additional code is not much. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > I expect the code for KIP-112 to be around 1100 >> > >>>>> lines new >> > >>>>> > > > code. >> > >>>>> > > > >> > > Previously >> > >>>>> > > > >> > > > > > I have implemented a prototype of a slightly >> > >>>>> different >> > >>>>> > > design >> > >>>>> > > > >> (see >> > >>>>> > > > >> > > here >> > >>>>> > > > >> > > > > > <https://docs.google.com/docum >> > >>>>> ent/d/1Izza0SBmZMVUBUt9s_ >> > >>>>> > > > >> > > -Dqi3D8e0KGJQYW8xgEdRsgAI/edit>) >> > >>>>> > > > >> > > > > > and uploaded it to github (see here >> > >>>>> > > > >> > > > > > <https://github.com/lindong28/kafka/tree/JBOD >> >). >> > >>>>> The >> > >>>>> > patch >> > >>>>> > > > >> changed >> > >>>>> > > > >> > > 33 >> > >>>>> > > > >> > > > > > files, added 1185 lines and deleted 183 lines. >> The >> > >>>>> size of >> > >>>>> > > > >> prototype >> > >>>>> > > > >> > > patch >> > >>>>> > > > >> > > > > > is actually smaller than patch of KIP-107 (see >> > here >> > >>>>> > > > >> > > > > > <https://github.com/apache/kafka/pull/2476>) >> > which >> > >>>>> is >> > >>>>> > > already >> > >>>>> > > > >> > > accepted. >> > >>>>> > > > >> > > > > > The KIP-107 patch changed 49 files, added 1349 >> > >>>>> lines and >> > >>>>> > > > >> deleted 141 >> > >>>>> > > > >> > > lines. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > 3) Comparison with >> one-broker-per-multiple-volume >> > s >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > This KIP can improve the availability of Kafka >> in >> > >>>>> this >> > >>>>> > case >> > >>>>> > > > such >> > >>>>> > > > >> > > that one >> > >>>>> > > > >> > > > > > failed volume doesn't bring down the entire >> > broker. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > 4) Comparison with one-broker-per-volume >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > If each volume maps to multiple disks, then we >> > >>>>> still have >> > >>>>> > > > >> similar >> > >>>>> > > > >> > > problem >> > >>>>> > > > >> > > > > > such that the broker will fail if any disk of >> the >> > >>>>> volume >> > >>>>> > > > failed. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > If each volume maps to one disk, it means that >> we >> > >>>>> need to >> > >>>>> > > > >> deploy 10 >> > >>>>> > > > >> > > > > > brokers on a machine if the machine has 10 >> disks. >> > I >> > >>>>> will >> > >>>>> > > > >> explain the >> > >>>>> > > > >> > > > > > concern with this approach in order of their >> > >>>>> importance. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > - It is weird if we were to tell kafka user to >> > >>>>> deploy 50 >> > >>>>> > > > >> brokers on a >> > >>>>> > > > >> > > > > > machine of 50 disks. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > - Either when user deploys Kafka on a >> commercial >> > >>>>> cloud >> > >>>>> > > > platform >> > >>>>> > > > >> or >> > >>>>> > > > >> > > when >> > >>>>> > > > >> > > > > > user deploys their own cluster, the size or >> > largest >> > >>>>> disk >> > >>>>> > is >> > >>>>> > > > >> usually >> > >>>>> > > > >> > > > > > limited. There will be scenarios where user >> want >> > to >> > >>>>> > increase >> > >>>>> > > > >> broker >> > >>>>> > > > >> > > > > > capacity by having multiple disks per broker. >> This >> > >>>>> JBOD >> > >>>>> > KIP >> > >>>>> > > > >> makes it >> > >>>>> > > > >> > > > > > feasible without hurting availability due to >> > single >> > >>>>> disk >> > >>>>> > > > >> failure. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > - Automatic load rebalance across disks will be >> > >>>>> easier and >> > >>>>> > > > more >> > >>>>> > > > >> > > flexible >> > >>>>> > > > >> > > > > > if one broker has multiple disks. This can be >> > >>>>> future work. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > - There is performance concern when you deploy >> 10 >> > >>>>> broker >> > >>>>> > > vs. 1 >> > >>>>> > > > >> > > broker on >> > >>>>> > > > >> > > > > > one machine. The metadata the cluster, >> including >> > >>>>> > > FetchRequest, >> > >>>>> > > > >> > > > > > ProduceResponse, MetadataRequest and so on will >> > all >> > >>>>> be 10X >> > >>>>> > > > >> more. The >> > >>>>> > > > >> > > > > > packet-per-second will be 10X higher which may >> > limit >> > >>>>> > > > >> performance if >> > >>>>> > > > >> > > pps is >> > >>>>> > > > >> > > > > > the performance bottleneck. The number of >> socket >> > on >> > >>>>> the >> > >>>>> > > > machine >> > >>>>> > > > >> is >> > >>>>> > > > >> > > 10X >> > >>>>> > > > >> > > > > > higher. And the number of replication thread >> will >> > >>>>> be 100X >> > >>>>> > > > more. >> > >>>>> > > > >> The >> > >>>>> > > > >> > > impact >> > >>>>> > > > >> > > > > > will be more significant with increasing >> number of >> > >>>>> disks >> > >>>>> > per >> > >>>>> > > > >> > > machine. Thus >> > >>>>> > > > >> > > > > > it will limit Kakfa's scalability in the long >> > term. >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > Thanks, >> > >>>>> > > > >> > > > > > Dong >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > > On Tue, Feb 7, 2017 at 1:51 AM, Eno Thereska < >> > >>>>> > > > >> eno.there...@gmail.com >> > >>>>> > > > >> > > > >> > >>>>> > > > >> > > > > > wrote: >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > > > >> Hi Dong, >> > >>>>> > > > >> > > > > >> >> > >>>>> > > > >> > > > > >> To simplify the discussion today, on my part >> I'll >> > >>>>> zoom >> > >>>>> > into >> > >>>>> > > > one >> > >>>>> > > > >> > > thing >> > >>>>> > > > >> > > > > >> only: >> > >>>>> > > > >> > > > > >> >> > >>>>> > > > >> > > > > >> - I'll discuss the options called below : >> > >>>>> > > > >> "one-broker-per-disk" or >> > >>>>> > > > >> > > > > >> "one-broker-per-few-disks". >> > >>>>> > > > >> > > > > >> >> > >>>>> > > > >> > > > > >> - I completely buy the JBOD vs RAID arguments >> so >> > >>>>> there is >> > >>>>> > > no >> > >>>>> > > > >> need to >> > >>>>> > > > >> > > > > >> discuss that part for me. I buy it that JBODs >> are >> > >>>>> good. >> > >>>>> > > > >> > > > > >> >> > >>>>> > > > >> > > > > >> I find the terminology can be improved a bit. >> > >>>>> Ideally >> > >>>>> > we'd >> > >>>>> > > be >> > >>>>> > > > >> > > talking >> > >>>>> > > > >> > > > > >> about volumes, not disks. Just to make it >> clear >> > >>>>> that >> > >>>>> > Kafka >> > >>>>> > > > >> > > understand >> > >>>>> > > > >> > > > > >> volumes/directories, not individual raw >> disks. So >> > >>>>> by >> > >>>>> > > > >> > > > > >> "one-broker-per-few-disks" what I mean is that >> > the >> > >>>>> admin >> > >>>>> > > can >> > >>>>> > > > >> pool a >> > >>>>> > > > >> > > few >> > >>>>> > > > >> > > > > >> disks together to create a volume/directory >> and >> > >>>>> give that >> > >>>>> > > to >> > >>>>> > > > >> Kafka. >> > >>>>> > > > >> > > > > >> >> > >>>>> > > > >> > > > > >> >> > >>>>> > > > >> > > > > >> The kernel of my question will be that the >> admin >> > >>>>> already >> > >>>>> > > has >> > >>>>> > > > >> tools >> > >>>>> > > > >> > > to 1) >> > >>>>> > > > >> > > > > >> create volumes/directories from a JBOD and 2) >> > >>>>> start a >> > >>>>> > > broker >> > >>>>> > > > >> on a >> > >>>>> > > > >> > > desired >> > >>>>> > > > >> > > > > >> machine and 3) assign a broker resources like >> a >> > >>>>> > directory. >> > >>>>> > > I >> > >>>>> > > > >> claim >> > >>>>> > > > >> > > that >> > >>>>> > > > >> > > > > >> those tools are sufficient to optimise >> resource >> > >>>>> > allocation. >> > >>>>> > > > I >> > >>>>> > > > >> > > understand >> > >>>>> > > > >> > > > > >> that a broker could manage point 3) itself, ie >> > >>>>> juggle the >> > >>>>> > > > >> > > directories. My >> > >>>>> > > > >> > > > > >> question is whether the complexity added to >> Kafka >> > >>>>> is >> > >>>>> > > > justified. >> > >>>>> > > > >> > > > > >> Operationally it seems to me an admin will >> still >> > >>>>> have to >> > >>>>> > do >> > >>>>> > > > >> all the >> > >>>>> > > > >> > > three >> > >>>>> > > > >> > > > > >> items above. >> > >>>>> > > > >> > > > > >> >> > >>>>> > > > >> > > > > >> Looking forward to the discussion >> > >>>>> > > > >> > > > > >> Thanks >> > >>>>> > > > >> > > > > >> Eno >> > >>>>> > > > >> > > > > >> >> > >>>>> > > > >> > > > > >> >> > >>>>> > > > >> > > > > >> > On 1 Feb 2017, at 17:21, Dong Lin < >> > >>>>> lindon...@gmail.com >> > >>>>> > > >> > >>>>> > > > >> wrote: >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > Hey Eno, >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > Thanks much for the review. >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > I think your suggestion is to split disks >> of a >> > >>>>> machine >> > >>>>> > > into >> > >>>>> > > > >> > > multiple >> > >>>>> > > > >> > > > > >> disk >> > >>>>> > > > >> > > > > >> > sets and run one broker per disk set. Yeah >> this >> > >>>>> is >> > >>>>> > > similar >> > >>>>> > > > to >> > >>>>> > > > >> > > Colin's >> > >>>>> > > > >> > > > > >> > suggestion of one-broker-per-disk, which we >> > have >> > >>>>> > > evaluated >> > >>>>> > > > at >> > >>>>> > > > >> > > LinkedIn >> > >>>>> > > > >> > > > > >> and >> > >>>>> > > > >> > > > > >> > considered it to be a good short term >> approach. >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > As of now I don't think any of these >> approach >> > is >> > >>>>> a >> > >>>>> > better >> > >>>>> > > > >> > > alternative in >> > >>>>> > > > >> > > > > >> > the long term. I will summarize these here. >> I >> > >>>>> have put >> > >>>>> > > > these >> > >>>>> > > > >> > > reasons in >> > >>>>> > > > >> > > > > >> the >> > >>>>> > > > >> > > > > >> > KIP's motivation section and rejected >> > alternative >> > >>>>> > > section. >> > >>>>> > > > I >> > >>>>> > > > >> am >> > >>>>> > > > >> > > happy to >> > >>>>> > > > >> > > > > >> > discuss more and I would certainly like to >> use >> > an >> > >>>>> > > > alternative >> > >>>>> > > > >> > > solution >> > >>>>> > > > >> > > > > >> that >> > >>>>> > > > >> > > > > >> > is easier to do with better performance. >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > - JBOD vs. RAID-10: if we switch from >> RAID-10 >> > >>>>> with >> > >>>>> > > > >> > > > > >> replication-factoer=2 to >> > >>>>> > > > >> > > > > >> > JBOD with replicatio-factor=3, we get 25% >> > >>>>> reduction in >> > >>>>> > > disk >> > >>>>> > > > >> usage >> > >>>>> > > > >> > > and >> > >>>>> > > > >> > > > > >> > doubles the tolerance of broker failure >> before >> > >>>>> data >> > >>>>> > > > >> > > unavailability from >> > >>>>> > > > >> > > > > >> 1 >> > >>>>> > > > >> > > > > >> > to 2. This is pretty huge gain for any >> company >> > >>>>> that >> > >>>>> > uses >> > >>>>> > > > >> Kafka at >> > >>>>> > > > >> > > large >> > >>>>> > > > >> > > > > >> > scale. >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > - JBOD vs. one-broker-per-disk: The benefit >> of >> > >>>>> > > > >> > > one-broker-per-disk is >> > >>>>> > > > >> > > > > >> that >> > >>>>> > > > >> > > > > >> > no major code change is needed in Kafka. >> Among >> > >>>>> the >> > >>>>> > > > >> disadvantage of >> > >>>>> > > > >> > > > > >> > one-broker-per-disk summarized in the KIP >> and >> > >>>>> previous >> > >>>>> > > > email >> > >>>>> > > > >> with >> > >>>>> > > > >> > > Colin, >> > >>>>> > > > >> > > > > >> > the biggest one is the 15% throughput loss >> > >>>>> compared to >> > >>>>> > > JBOD >> > >>>>> > > > >> and >> > >>>>> > > > >> > > less >> > >>>>> > > > >> > > > > >> > flexibility to balance across disks. >> Further, >> > it >> > >>>>> > probably >> > >>>>> > > > >> requires >> > >>>>> > > > >> > > > > >> change >> > >>>>> > > > >> > > > > >> > to internal deployment tools at various >> > >>>>> companies to >> > >>>>> > deal >> > >>>>> > > > >> with >> > >>>>> > > > >> > > > > >> > one-broker-per-disk setup. >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > - JBOD vs. RAID-0: This is the setup that >> used >> > at >> > >>>>> > > > Microsoft. >> > >>>>> > > > >> The >> > >>>>> > > > >> > > > > >> problem is >> > >>>>> > > > >> > > > > >> > that a broker becomes unavailable if any >> disk >> > >>>>> fail. >> > >>>>> > > Suppose >> > >>>>> > > > >> > > > > >> > replication-factor=2 and there are 10 disks >> per >> > >>>>> > machine. >> > >>>>> > > > >> Then the >> > >>>>> > > > >> > > > > >> > probability of of any message becomes >> > >>>>> unavailable due >> > >>>>> > to >> > >>>>> > > > disk >> > >>>>> > > > >> > > failure >> > >>>>> > > > >> > > > > >> with >> > >>>>> > > > >> > > > > >> > RAID-0 is 100X higher than that with JBOD. >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > - JBOD vs. one-broker-per-few-disks: >> > >>>>> > > > one-broker-per-few-disk >> > >>>>> > > > >> is >> > >>>>> > > > >> > > > > >> somewhere >> > >>>>> > > > >> > > > > >> > between one-broker-per-disk and RAID-0. So >> it >> > >>>>> carries >> > >>>>> > an >> > >>>>> > > > >> averaged >> > >>>>> > > > >> > > > > >> > disadvantages of these two approaches. >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > To answer your question regarding, I think >> it >> > is >> > >>>>> > > reasonable >> > >>>>> > > > >> to >> > >>>>> > > > >> > > mange >> > >>>>> > > > >> > > > > >> disk >> > >>>>> > > > >> > > > > >> > in Kafka. By "managing disks" we mean the >> > >>>>> management of >> > >>>>> > > > >> > > assignment of >> > >>>>> > > > >> > > > > >> > replicas across disks. Here are my reasons >> in >> > >>>>> more >> > >>>>> > > detail: >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > - I don't think this KIP is a big step >> change. >> > By >> > >>>>> > > allowing >> > >>>>> > > > >> user to >> > >>>>> > > > >> > > > > >> > configure Kafka to run multiple log >> directories >> > >>>>> or >> > >>>>> > disks >> > >>>>> > > as >> > >>>>> > > > >> of >> > >>>>> > > > >> > > now, it >> > >>>>> > > > >> > > > > >> is >> > >>>>> > > > >> > > > > >> > implicit that Kafka manages disks. It is >> just >> > >>>>> not a >> > >>>>> > > > complete >> > >>>>> > > > >> > > feature. >> > >>>>> > > > >> > > > > >> > Microsoft and probably other companies are >> > using >> > >>>>> this >> > >>>>> > > > feature >> > >>>>> > > > >> > > under the >> > >>>>> > > > >> > > > > >> > undesirable effect that a broker will fail >> any >> > >>>>> if any >> > >>>>> > > disk >> > >>>>> > > > >> fail. >> > >>>>> > > > >> > > It is >> > >>>>> > > > >> > > > > >> good >> > >>>>> > > > >> > > > > >> > to complete this feature. >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > - I think it is reasonable to manage disk in >> > >>>>> Kafka. One >> > >>>>> > > of >> > >>>>> > > > >> the >> > >>>>> > > > >> > > most >> > >>>>> > > > >> > > > > >> > important work that Kafka is doing is to >> > >>>>> determine the >> > >>>>> > > > >> replica >> > >>>>> > > > >> > > > > >> assignment >> > >>>>> > > > >> > > > > >> > across brokers and make sure enough copies >> of a >> > >>>>> given >> > >>>>> > > > >> replica is >> > >>>>> > > > >> > > > > >> available. >> > >>>>> > > > >> > > > > >> > I would argue that it is not much different >> > than >> > >>>>> > > > determining >> > >>>>> > > > >> the >> > >>>>> > > > >> > > replica >> > >>>>> > > > >> > > > > >> > assignment across disk conceptually. >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > - I would agree that this KIP is improve >> > >>>>> performance of >> > >>>>> > > > >> Kafka at >> > >>>>> > > > >> > > the >> > >>>>> > > > >> > > > > >> cost >> > >>>>> > > > >> > > > > >> > of more complexity inside Kafka, by >> switching >> > >>>>> from >> > >>>>> > > RAID-10 >> > >>>>> > > > to >> > >>>>> > > > >> > > JBOD. I >> > >>>>> > > > >> > > > > >> would >> > >>>>> > > > >> > > > > >> > argue that this is a right direction. If we >> can >> > >>>>> gain >> > >>>>> > 20%+ >> > >>>>> > > > >> > > performance by >> > >>>>> > > > >> > > > > >> > managing NIC in Kafka as compared to >> existing >> > >>>>> approach >> > >>>>> > > and >> > >>>>> > > > >> other >> > >>>>> > > > >> > > > > >> > alternatives, I would say we should just do >> it. >> > >>>>> Such a >> > >>>>> > > gain >> > >>>>> > > > >> in >> > >>>>> > > > >> > > > > >> performance, >> > >>>>> > > > >> > > > > >> > or equivalently reduction in cost, can save >> > >>>>> millions of >> > >>>>> > > > >> dollars >> > >>>>> > > > >> > > per year >> > >>>>> > > > >> > > > > >> > for any company running Kafka at large >> scale. >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > Thanks, >> > >>>>> > > > >> > > > > >> > Dong >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> > On Wed, Feb 1, 2017 at 5:41 AM, Eno >> Thereska < >> > >>>>> > > > >> > > eno.there...@gmail.com> >> > >>>>> > > > >> > > > > >> wrote: >> > >>>>> > > > >> > > > > >> > >> > >>>>> > > > >> > > > > >> >> I'm coming somewhat late to the discussion, >> > >>>>> apologies >> > >>>>> > > for >> > >>>>> > > > >> that. >> > >>>>> > > > >> > > > > >> >> >> > >>>>> > > > >> > > > > >> >> I'm worried about this proposal. It's >> moving >> > >>>>> Kafka to >> > >>>>> > a >> > >>>>> > > > >> world >> > >>>>> > > > >> > > where it >> > >>>>> > > > >> > > > > >> >> manages disks. So in a sense, the scope of >> the >> > >>>>> KIP is >> > >>>>> > > > >> limited, >> > >>>>> > > > >> > > but the >> > >>>>> > > > >> > > > > >> >> direction it sets for Kafka is quite a big >> > step >> > >>>>> > change. >> > >>>>> > > > >> > > Fundamentally >> > >>>>> > > > >> > > > > >> this >> > >>>>> > > > >> > > > > >> >> is about balancing resources for a Kafka >> > >>>>> broker. This >> > >>>>> > > can >> > >>>>> > > > be >> > >>>>> > > > >> > > done by a >> > >>>>> > > > >> > > > > >> >> tool, rather than by changing Kafka. E.g., >> the >> > >>>>> tool >> > >>>>> > > would >> > >>>>> > > > >> take a >> > >>>>> > > > >> > > bunch >> > >>>>> > > > >> > > > > >> of >> > >>>>> > > > >> > > > > >> >> disks together, create a volume over them >> and >> > >>>>> export >> > >>>>> > > that >> > >>>>> > > > >> to a >> > >>>>> > > > >> > > Kafka >> > >>>>> > > > >> > > > > >> broker >> > >>>>> > > > >> > > > > >> >> (in addition to setting the memory limits >> for >> > >>>>> that >> > >>>>> > > broker >> > >>>>> > > > or >> > >>>>> > > > >> > > limiting >> > >>>>> > > > >> > > > > >> other >> > >>>>> > > > >> > > > > >> >> resources). A different bunch of disks can >> > then >> > >>>>> make >> > >>>>> > up >> > >>>>> > > a >> > >>>>> > > > >> second >> > >>>>> > > > >> > > > > >> volume, >> > >>>>> > > > >> > > > > >> >> and be used by another Kafka broker. This >> is >> > >>>>> aligned >> > >>>>> > > with >> > >>>>> > > > >> what >> > >>>>> > > > >> > > Colin is >> > >>>>> > > > >> > > > > >> >> saying (as I understand it). >> > >>>>> > > > >> > > > > >> >> >> > >>>>> > > > >> > > > > >> >> Disks are not the only resource on a >> machine, >> > >>>>> there >> > >>>>> > are >> > >>>>> > > > >> several >> > >>>>> > > > >> > > > > >> instances >> > >>>>> > > > >> > > > > >> >> where multiple NICs are used for example. >> Do >> > we >> > >>>>> want >> > >>>>> > > fine >> > >>>>> > > > >> grained >> > >>>>> > > > >> > > > > >> >> management of all these resources? I'd >> argue >> > >>>>> that >> > >>>>> > opens >> > >>>>> > > us >> > >>>>> > > > >> the >> > >>>>> > > > >> > > system >> > >>>>> > > > >> > > > > >> to a >> > >>>>> > > > >> > > > > >> >> lot of complexity. >> > >>>>> > > > >> > > > > >> >> >> > >>>>> > > > >> > > > > >> >> Thanks >> > >>>>> > > > >> > > > > >> >> Eno >> > >>>>> > > > >> > > > > >> >> >> > >>>>> > > > >> > > > > >> >> >> > >>>>> > > > >> > > > > >> >>> On 1 Feb 2017, at 01:53, Dong Lin < >> > >>>>> > lindon...@gmail.com >> > >>>>> > > > >> > >>>>> > > > >> wrote: >> > >>>>> > > > >> > > > > >> >>> >> > >>>>> > > > >> > > > > >> >>> Hi all, >> > >>>>> > > > >> > > > > >> >>> >> > >>>>> > > > >> > > > > >> >>> I am going to initiate the vote If there >> is >> > no >> > >>>>> > further >> > >>>>> > > > >> concern >> > >>>>> > > > >> > > with >> > >>>>> > > > >> > > > > >> the >> > >>>>> > > > >> > > > > >> >> KIP. >> > >>>>> > > > >> > > > > >> >>> >> > >>>>> > > > >> > > > > >> >>> Thanks, >> > >>>>> > > > >> > > > > >> >>> Dong >> > >>>>> > > > >> > > > > >> >>> >> > >>>>> > > > >> > > > > >> >>> >> > >>>>> > > > >> > > > > >> >>> On Fri, Jan 27, 2017 at 8:08 PM, radai < >> > >>>>> > > > >> > > radai.rosenbl...@gmail.com> >> > >>>>> > > > >> > > > > >> >> wrote: >> > >>>>> > > > >> > > > > >> >>> >> > >>>>> > > > >> > > > > >> >>>> a few extra points: >> > >>>>> > > > >> > > > > >> >>>> >> > >>>>> > > > >> > > > > >> >>>> 1. broker per disk might also incur more >> > >>>>> client <--> >> > >>>>> > > > >> broker >> > >>>>> > > > >> > > sockets: >> > >>>>> > > > >> > > > > >> >>>> suppose every producer / consumer >> "talks" to >> > >>>>> >1 >> > >>>>> > > > partition, >> > >>>>> > > > >> > > there's a >> > >>>>> > > > >> > > > > >> >> very >> > >>>>> > > > >> > > > > >> >>>> good chance that partitions that were >> > >>>>> co-located on >> > >>>>> > a >> > >>>>> > > > >> single >> > >>>>> > > > >> > > 10-disk >> > >>>>> > > > >> > > > > >> >> broker >> > >>>>> > > > >> > > > > >> >>>> would now be split between several >> > single-disk >> > >>>>> > broker >> > >>>>> > > > >> > > processes on >> > >>>>> > > > >> > > > > >> the >> > >>>>> > > > >> > > > > >> >> same >> > >>>>> > > > >> > > > > >> >>>> machine. hard to put a multiplier on >> this, >> > but >> > >>>>> > likely >> > >>>>> > > > >x1. >> > >>>>> > > > >> > > sockets >> > >>>>> > > > >> > > > > >> are a >> > >>>>> > > > >> > > > > >> >>>> limited resource at the OS level and >> incur >> > >>>>> some >> > >>>>> > memory >> > >>>>> > > > >> cost >> > >>>>> > > > >> > > (kernel >> > >>>>> > > > >> > > > > >> >>>> buffers) >> > >>>>> > > > >> > > > > >> >>>> >> > >>>>> > > > >> > > > > >> >>>> 2. there's a memory overhead to spinning >> up >> > a >> > >>>>> JVM >> > >>>>> > > > >> (compiled >> > >>>>> > > > >> > > code and >> > >>>>> > > > >> > > > > >> >> byte >> > >>>>> > > > >> > > > > >> >>>> code objects etc). if we assume this >> > overhead >> > >>>>> is >> > >>>>> > ~300 >> > >>>>> > > MB >> > >>>>> > > > >> > > (order of >> > >>>>> > > > >> > > > > >> >>>> magnitude, specifics vary) than spinning >> up >> > >>>>> 10 JVMs >> > >>>>> > > > would >> > >>>>> > > > >> lose >> > >>>>> > > > >> > > you 3 >> > >>>>> > > > >> > > > > >> GB >> > >>>>> > > > >> > > > > >> >> of >> > >>>>> > > > >> > > > > >> >>>> RAM. not a ton, but non negligible. >> > >>>>> > > > >> > > > > >> >>>> >> > >>>>> > > > >> > > > > >> >>>> 3. there would also be some overhead >> > >>>>> downstream of >> > >>>>> > > kafka >> > >>>>> > > > >> in any >> > >>>>> > > > >> > > > > >> >> management >> > >>>>> > > > >> > > > > >> >>>> / monitoring / log aggregation system. >> > likely >> > >>>>> less >> > >>>>> > > than >> > >>>>> > > > >> x10 >> > >>>>> > > > >> > > though. >> > >>>>> > > > >> > > > > >> >>>> >> > >>>>> > > > >> > > > > >> >>>> 4. (related to above) - added complexity >> of >> > >>>>> > > > administration >> > >>>>> > > > >> > > with more >> > >>>>> > > > >> > > > > >> >>>> running instances. >> > >>>>> > > > >> > > > > >> >>>> >> > >>>>> > > > >> > > > > >> >>>> is anyone running kafka with anywhere >> near >> > >>>>> 100GB >> > >>>>> > > heaps? >> > >>>>> > > > i >> > >>>>> > > > >> > > thought the >> > >>>>> > > > >> > > > > >> >> point >> > >>>>> > > > >> > > > > >> >>>> was to rely on kernel page cache to do >> the >> > >>>>> disk >> > >>>>> > > > buffering >> > >>>>> > > > >> .... >> > >>>>> > > > >> > > > > >> >>>> >> > >>>>> > > > >> > > > > >> >>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong >> Lin < >> > >>>>> > > > >> > > lindon...@gmail.com> >> > >>>>> > > > >> > > > > >> wrote: >> > >>>>> > > > >> > > > > >> >>>> >> > >>>>> > > > >> > > > > >> >>>>> Hey Colin, >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>> Thanks much for the comment. Please see >> me >> > >>>>> comment >> > >>>>> > > > >> inline. >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin >> > >>>>> McCabe < >> > >>>>> > > > >> > > cmcc...@apache.org> >> > >>>>> > > > >> > > > > >> >>>> wrote: >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>>> On Wed, Jan 25, 2017, at 13:50, Dong >> Lin >> > >>>>> wrote: >> > >>>>> > > > >> > > > > >> >>>>>>> Hey Colin, >> > >>>>> > > > >> > > > > >> >>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> Good point! Yeah we have actually >> > >>>>> considered and >> > >>>>> > > > >> tested this >> > >>>>> > > > >> > > > > >> >>>> solution, >> > >>>>> > > > >> > > > > >> >>>>>>> which we call one-broker-per-disk. It >> > >>>>> would work >> > >>>>> > > and >> > >>>>> > > > >> should >> > >>>>> > > > >> > > > > >> require >> > >>>>> > > > >> > > > > >> >>>> no >> > >>>>> > > > >> > > > > >> >>>>>>> major change in Kafka as compared to >> this >> > >>>>> JBOD >> > >>>>> > KIP. >> > >>>>> > > > So >> > >>>>> > > > >> it >> > >>>>> > > > >> > > would >> > >>>>> > > > >> > > > > >> be a >> > >>>>> > > > >> > > > > >> >>>>> good >> > >>>>> > > > >> > > > > >> >>>>>>> short term solution. >> > >>>>> > > > >> > > > > >> >>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> But it has a few drawbacks which >> makes it >> > >>>>> less >> > >>>>> > > > >> desirable in >> > >>>>> > > > >> > > the >> > >>>>> > > > >> > > > > >> long >> > >>>>> > > > >> > > > > >> >>>>>>> term. >> > >>>>> > > > >> > > > > >> >>>>>>> Assume we have 10 disks on a machine. >> > Here >> > >>>>> are >> > >>>>> > the >> > >>>>> > > > >> problems: >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>>> Hi Dong, >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>>> Thanks for the thoughtful reply. >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> 1) Our stress test result shows that >> > >>>>> > > > >> one-broker-per-disk >> > >>>>> > > > >> > > has 15% >> > >>>>> > > > >> > > > > >> >>>> lower >> > >>>>> > > > >> > > > > >> >>>>>>> throughput >> > >>>>> > > > >> > > > > >> >>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> 2) Controller would need to send 10X >> as >> > >>>>> many >> > >>>>> > > > >> > > LeaderAndIsrRequest, >> > >>>>> > > > >> > > > > >> >>>>>>> MetadataUpdateRequest and >> > >>>>> StopReplicaRequest. >> > >>>>> > This >> > >>>>> > > > >> > > increases the >> > >>>>> > > > >> > > > > >> >>>> burden >> > >>>>> > > > >> > > > > >> >>>>>>> on >> > >>>>> > > > >> > > > > >> >>>>>>> controller which can be the >> performance >> > >>>>> > bottleneck. >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>>> Maybe I'm misunderstanding something, >> but >> > >>>>> there >> > >>>>> > > would >> > >>>>> > > > >> not be >> > >>>>> > > > >> > > 10x as >> > >>>>> > > > >> > > > > >> >>>> many >> > >>>>> > > > >> > > > > >> >>>>>> StopReplicaRequest RPCs, would there? >> The >> > >>>>> other >> > >>>>> > > > >> requests >> > >>>>> > > > >> > > would >> > >>>>> > > > >> > > > > >> >>>> increase >> > >>>>> > > > >> > > > > >> >>>>>> 10x, but from a pretty low base, right? >> > We >> > >>>>> are >> > >>>>> > not >> > >>>>> > > > >> > > reassigning >> > >>>>> > > > >> > > > > >> >>>>>> partitions all the time, I hope (or >> else >> > we >> > >>>>> have >> > >>>>> > > > bigger >> > >>>>> > > > >> > > > > >> problems...) >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>> I think the controller will group >> > >>>>> > StopReplicaRequest >> > >>>>> > > > per >> > >>>>> > > > >> > > broker and >> > >>>>> > > > >> > > > > >> >> send >> > >>>>> > > > >> > > > > >> >>>>> only one StopReplicaRequest to a broker >> > >>>>> during >> > >>>>> > > > controlled >> > >>>>> > > > >> > > shutdown. >> > >>>>> > > > >> > > > > >> >>>> Anyway, >> > >>>>> > > > >> > > > > >> >>>>> we don't have to worry about this if we >> > >>>>> agree that >> > >>>>> > > > other >> > >>>>> > > > >> > > requests >> > >>>>> > > > >> > > > > >> will >> > >>>>> > > > >> > > > > >> >>>>> increase by 10X. One MetadataRequest to >> > send >> > >>>>> to >> > >>>>> > each >> > >>>>> > > > >> broker >> > >>>>> > > > >> > > in the >> > >>>>> > > > >> > > > > >> >>>> cluster >> > >>>>> > > > >> > > > > >> >>>>> every time there is leadership change. >> I am >> > >>>>> not >> > >>>>> > sure >> > >>>>> > > > >> this is >> > >>>>> > > > >> > > a real >> > >>>>> > > > >> > > > > >> >>>>> problem. But in theory this makes the >> > >>>>> overhead >> > >>>>> > > > complexity >> > >>>>> > > > >> > > O(number >> > >>>>> > > > >> > > > > >> of >> > >>>>> > > > >> > > > > >> >>>>> broker) and may be a concern in the >> future. >> > >>>>> Ideally >> > >>>>> > > we >> > >>>>> > > > >> should >> > >>>>> > > > >> > > avoid >> > >>>>> > > > >> > > > > >> it. >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> 3) Less efficient use of physical >> > resource >> > >>>>> on the >> > >>>>> > > > >> machine. >> > >>>>> > > > >> > > The >> > >>>>> > > > >> > > > > >> number >> > >>>>> > > > >> > > > > >> >>>>> of >> > >>>>> > > > >> > > > > >> >>>>>>> socket on each machine will increase >> by >> > >>>>> 10X. The >> > >>>>> > > > >> number of >> > >>>>> > > > >> > > > > >> connection >> > >>>>> > > > >> > > > > >> >>>>>>> between any two machine will increase >> by >> > >>>>> 100X. >> > >>>>> > > > >> > > > > >> >>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> 4) Less efficient way to management >> > memory >> > >>>>> and >> > >>>>> > > quota. >> > >>>>> > > > >> > > > > >> >>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> 5) Rebalance between disks/brokers on >> the >> > >>>>> same >> > >>>>> > > > machine >> > >>>>> > > > >> will >> > >>>>> > > > >> > > less >> > >>>>> > > > >> > > > > >> >>>>>>> efficient >> > >>>>> > > > >> > > > > >> >>>>>>> and less flexible. Broker has to read >> > data >> > >>>>> from >> > >>>>> > > > another >> > >>>>> > > > >> > > broker on >> > >>>>> > > > >> > > > > >> the >> > >>>>> > > > >> > > > > >> >>>>>>> same >> > >>>>> > > > >> > > > > >> >>>>>>> machine via socket. It is also harder >> to >> > do >> > >>>>> > > automatic >> > >>>>> > > > >> load >> > >>>>> > > > >> > > balance >> > >>>>> > > > >> > > > > >> >>>>>>> between >> > >>>>> > > > >> > > > > >> >>>>>>> disks on the same machine in the >> future. >> > >>>>> > > > >> > > > > >> >>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> I will put this and the explanation in >> > the >> > >>>>> > rejected >> > >>>>> > > > >> > > alternative >> > >>>>> > > > >> > > > > >> >>>>> section. >> > >>>>> > > > >> > > > > >> >>>>>>> I >> > >>>>> > > > >> > > > > >> >>>>>>> have a few questions: >> > >>>>> > > > >> > > > > >> >>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> - Can you explain why this solution >> can >> > >>>>> help >> > >>>>> > avoid >> > >>>>> > > > >> > > scalability >> > >>>>> > > > >> > > > > >> >>>>>>> bottleneck? >> > >>>>> > > > >> > > > > >> >>>>>>> I actually think it will exacerbate >> the >> > >>>>> > scalability >> > >>>>> > > > >> problem >> > >>>>> > > > >> > > due >> > >>>>> > > > >> > > > > >> the >> > >>>>> > > > >> > > > > >> >>>> 2) >> > >>>>> > > > >> > > > > >> >>>>>>> above. >> > >>>>> > > > >> > > > > >> >>>>>>> - Why can we push more RPC with this >> > >>>>> solution? >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>>> To really answer this question we'd >> have >> > to >> > >>>>> take a >> > >>>>> > > > deep >> > >>>>> > > > >> dive >> > >>>>> > > > >> > > into >> > >>>>> > > > >> > > > > >> the >> > >>>>> > > > >> > > > > >> >>>>>> locking of the broker and figure out >> how >> > >>>>> > effectively >> > >>>>> > > > it >> > >>>>> > > > >> can >> > >>>>> > > > >> > > > > >> >> parallelize >> > >>>>> > > > >> > > > > >> >>>>>> truly independent requests. Almost >> every >> > >>>>> > > > multithreaded >> > >>>>> > > > >> > > process is >> > >>>>> > > > >> > > > > >> >>>> going >> > >>>>> > > > >> > > > > >> >>>>>> to have shared state, like shared >> queues >> > or >> > >>>>> shared >> > >>>>> > > > >> sockets, >> > >>>>> > > > >> > > that is >> > >>>>> > > > >> > > > > >> >>>>>> going to make scaling less than linear >> > when >> > >>>>> you >> > >>>>> > add >> > >>>>> > > > >> disks or >> > >>>>> > > > >> > > > > >> >>>> processors. >> > >>>>> > > > >> > > > > >> >>>>>> (And clearly, another option is to >> improve >> > >>>>> that >> > >>>>> > > > >> scalability, >> > >>>>> > > > >> > > rather >> > >>>>> > > > >> > > > > >> >>>>>> than going multi-process!) >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>> Yeah I also think it is better to >> improve >> > >>>>> > scalability >> > >>>>> > > > >> inside >> > >>>>> > > > >> > > kafka >> > >>>>> > > > >> > > > > >> code >> > >>>>> > > > >> > > > > >> >>>> if >> > >>>>> > > > >> > > > > >> >>>>> possible. I am not sure we currently >> have >> > any >> > >>>>> > > > scalability >> > >>>>> > > > >> > > issue >> > >>>>> > > > >> > > > > >> inside >> > >>>>> > > > >> > > > > >> >>>>> Kafka that can not be removed without >> using >> > >>>>> > > > >> multi-process. >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> - It is true that a garbage >> collection in >> > >>>>> one >> > >>>>> > > broker >> > >>>>> > > > >> would >> > >>>>> > > > >> > > not >> > >>>>> > > > >> > > > > >> affect >> > >>>>> > > > >> > > > > >> >>>>>>> others. But that is after every broker >> > >>>>> only uses >> > >>>>> > > 1/10 >> > >>>>> > > > >> of the >> > >>>>> > > > >> > > > > >> memory. >> > >>>>> > > > >> > > > > >> >>>>> Can >> > >>>>> > > > >> > > > > >> >>>>>>> we be sure that this will actually >> help >> > >>>>> > > performance? >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>>> The big question is, how much memory do >> > >>>>> Kafka >> > >>>>> > > brokers >> > >>>>> > > > >> use >> > >>>>> > > > >> > > now, and >> > >>>>> > > > >> > > > > >> how >> > >>>>> > > > >> > > > > >> >>>>>> much will they use in the future? Our >> > >>>>> experience >> > >>>>> > in >> > >>>>> > > > >> HDFS >> > >>>>> > > > >> > > was that >> > >>>>> > > > >> > > > > >> >> once >> > >>>>> > > > >> > > > > >> >>>>>> you start getting more than 100-200GB >> Java >> > >>>>> heap >> > >>>>> > > sizes, >> > >>>>> > > > >> full >> > >>>>> > > > >> > > GCs >> > >>>>> > > > >> > > > > >> start >> > >>>>> > > > >> > > > > >> >>>>>> taking minutes to finish when using the >> > >>>>> standard >> > >>>>> > > JVMs. >> > >>>>> > > > >> That >> > >>>>> > > > >> > > alone >> > >>>>> > > > >> > > > > >> is >> > >>>>> > > > >> > > > > >> >> a >> > >>>>> > > > >> > > > > >> >>>>>> good reason to go multi-process or >> > consider >> > >>>>> > storing >> > >>>>> > > > more >> > >>>>> > > > >> > > things off >> > >>>>> > > > >> > > > > >> >> the >> > >>>>> > > > >> > > > > >> >>>>>> Java heap. >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>> I see. Now I agree one-broker-per-disk >> > >>>>> should be >> > >>>>> > more >> > >>>>> > > > >> > > efficient in >> > >>>>> > > > >> > > > > >> >> terms >> > >>>>> > > > >> > > > > >> >>>> of >> > >>>>> > > > >> > > > > >> >>>>> GC since each broker probably needs less >> > >>>>> than 1/10 >> > >>>>> > of >> > >>>>> > > > the >> > >>>>> > > > >> > > memory >> > >>>>> > > > >> > > > > >> >>>> available >> > >>>>> > > > >> > > > > >> >>>>> on a typical machine nowadays. I will >> > remove >> > >>>>> this >> > >>>>> > > from >> > >>>>> > > > >> the >> > >>>>> > > > >> > > reason of >> > >>>>> > > > >> > > > > >> >>>>> rejection. >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>>> Disk failure is the "easy" case. The >> > >>>>> "hard" case, >> > >>>>> > > > >> which is >> > >>>>> > > > >> > > > > >> >>>>>> unfortunately also the much more common >> > >>>>> case, is >> > >>>>> > > disk >> > >>>>> > > > >> > > misbehavior. >> > >>>>> > > > >> > > > > >> >>>>>> Towards the end of their lives, disks >> tend >> > >>>>> to >> > >>>>> > start >> > >>>>> > > > >> slowing >> > >>>>> > > > >> > > down >> > >>>>> > > > >> > > > > >> >>>>>> unpredictably. Requests that would >> have >> > >>>>> completed >> > >>>>> > > > >> > > immediately >> > >>>>> > > > >> > > > > >> before >> > >>>>> > > > >> > > > > >> >>>>>> start taking 20, 100 500 milliseconds. >> > >>>>> Some files >> > >>>>> > > may >> > >>>>> > > > >> be >> > >>>>> > > > >> > > readable >> > >>>>> > > > >> > > > > >> and >> > >>>>> > > > >> > > > > >> >>>>>> other files may not be. System calls >> > hang, >> > >>>>> > > sometimes >> > >>>>> > > > >> > > forever, and >> > >>>>> > > > >> > > > > >> the >> > >>>>> > > > >> > > > > >> >>>>>> Java process can't abort them, because >> the >> > >>>>> hang is >> > >>>>> > > in >> > >>>>> > > > >> the >> > >>>>> > > > >> > > kernel. >> > >>>>> > > > >> > > > > >> It >> > >>>>> > > > >> > > > > >> >>>> is >> > >>>>> > > > >> > > > > >> >>>>>> not fun when threads are stuck in "D >> > state" >> > >>>>> > > > >> > > > > >> >>>>>> http://stackoverflow.com/quest >> > >>>>> > > > >> ions/20423521/process-perminan >> > >>>>> > > > >> > > > > >> >>>>>> tly-stuck-on-d-state >> > >>>>> > > > >> > > > > >> >>>>>> . Even kill -9 cannot abort the thread >> > >>>>> then. >> > >>>>> > > > >> Fortunately, >> > >>>>> > > > >> > > this is >> > >>>>> > > > >> > > > > >> >>>>>> rare. >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>> I agree it is a harder problem and it is >> > >>>>> rare. We >> > >>>>> > > > >> probably >> > >>>>> > > > >> > > don't >> > >>>>> > > > >> > > > > >> have >> > >>>>> > > > >> > > > > >> >> to >> > >>>>> > > > >> > > > > >> >>>>> worry about it in this KIP since this >> issue >> > >>>>> is >> > >>>>> > > > >> orthogonal to >> > >>>>> > > > >> > > > > >> whether or >> > >>>>> > > > >> > > > > >> >>>> not >> > >>>>> > > > >> > > > > >> >>>>> we use JBOD. >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>>> Another approach we should consider is >> for >> > >>>>> Kafka >> > >>>>> > to >> > >>>>> > > > >> > > implement its >> > >>>>> > > > >> > > > > >> own >> > >>>>> > > > >> > > > > >> >>>>>> storage layer that would stripe across >> > >>>>> multiple >> > >>>>> > > disks. >> > >>>>> > > > >> This >> > >>>>> > > > >> > > > > >> wouldn't >> > >>>>> > > > >> > > > > >> >>>>>> have to be done at the block level, but >> > >>>>> could be >> > >>>>> > > done >> > >>>>> > > > >> at the >> > >>>>> > > > >> > > file >> > >>>>> > > > >> > > > > >> >>>> level. >> > >>>>> > > > >> > > > > >> >>>>>> We could use consistent hashing to >> > >>>>> determine which >> > >>>>> > > > >> disks a >> > >>>>> > > > >> > > file >> > >>>>> > > > >> > > > > >> should >> > >>>>> > > > >> > > > > >> >>>>>> end up on, for example. >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>> Are you suggesting that we should >> > distribute >> > >>>>> log, >> > >>>>> > or >> > >>>>> > > > log >> > >>>>> > > > >> > > segment, >> > >>>>> > > > >> > > > > >> >> across >> > >>>>> > > > >> > > > > >> >>>>> disks of brokers? I am not sure if I >> fully >> > >>>>> > understand >> > >>>>> > > > >> this >> > >>>>> > > > >> > > > > >> approach. My >> > >>>>> > > > >> > > > > >> >>>> gut >> > >>>>> > > > >> > > > > >> >>>>> feel is that this would be a drastic >> > >>>>> solution that >> > >>>>> > > > would >> > >>>>> > > > >> > > require >> > >>>>> > > > >> > > > > >> >>>>> non-trivial design. While this may be >> > useful >> > >>>>> to >> > >>>>> > > Kafka, >> > >>>>> > > > I >> > >>>>> > > > >> would >> > >>>>> > > > >> > > > > >> prefer >> > >>>>> > > > >> > > > > >> >> not >> > >>>>> > > > >> > > > > >> >>>>> to discuss this in detail in this thread >> > >>>>> unless you >> > >>>>> > > > >> believe >> > >>>>> > > > >> > > it is >> > >>>>> > > > >> > > > > >> >>>> strictly >> > >>>>> > > > >> > > > > >> >>>>> superior to the design in this KIP in >> terms >> > >>>>> of >> > >>>>> > > solving >> > >>>>> > > > >> our >> > >>>>> > > > >> > > use-case. >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>>>> best, >> > >>>>> > > > >> > > > > >> >>>>>> Colin >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> Thanks, >> > >>>>> > > > >> > > > > >> >>>>>>> Dong >> > >>>>> > > > >> > > > > >> >>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, >> Colin >> > >>>>> McCabe < >> > >>>>> > > > >> > > > > >> cmcc...@apache.org> >> > >>>>> > > > >> > > > > >> >>>>>>> wrote: >> > >>>>> > > > >> > > > > >> >>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>>> Hi Dong, >> > >>>>> > > > >> > > > > >> >>>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>>> Thanks for the writeup! It's very >> > >>>>> interesting. >> > >>>>> > > > >> > > > > >> >>>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>>> I apologize in advance if this has >> been >> > >>>>> > discussed >> > >>>>> > > > >> > > somewhere else. >> > >>>>> > > > >> > > > > >> >>>>> But >> > >>>>> > > > >> > > > > >> >>>>>> I >> > >>>>> > > > >> > > > > >> >>>>>>>> am curious if you have considered the >> > >>>>> solution >> > >>>>> > of >> > >>>>> > > > >> running >> > >>>>> > > > >> > > > > >> multiple >> > >>>>> > > > >> > > > > >> >>>>>>>> brokers per node. Clearly there is a >> > >>>>> memory >> > >>>>> > > > overhead >> > >>>>> > > > >> with >> > >>>>> > > > >> > > this >> > >>>>> > > > >> > > > > >> >>>>>> solution >> > >>>>> > > > >> > > > > >> >>>>>>>> because of the fixed cost of starting >> > >>>>> multiple >> > >>>>> > > JVMs. >> > >>>>> > > > >> > > However, >> > >>>>> > > > >> > > > > >> >>>>> running >> > >>>>> > > > >> > > > > >> >>>>>>>> multiple JVMs would help avoid >> > scalability >> > >>>>> > > > >> bottlenecks. >> > >>>>> > > > >> > > You >> > >>>>> > > > >> > > > > >> could >> > >>>>> > > > >> > > > > >> >>>>>>>> probably push more RPCs per second, >> for >> > >>>>> example. >> > >>>>> > > A >> > >>>>> > > > >> garbage >> > >>>>> > > > >> > > > > >> >>>>> collection >> > >>>>> > > > >> > > > > >> >>>>>>>> in one broker would not affect the >> > >>>>> others. It >> > >>>>> > > would >> > >>>>> > > > >> be >> > >>>>> > > > >> > > > > >> interesting >> > >>>>> > > > >> > > > > >> >>>>> to >> > >>>>> > > > >> > > > > >> >>>>>>>> see this considered in the "alternate >> > >>>>> designs" >> > >>>>> > > > design, >> > >>>>> > > > >> > > even if >> > >>>>> > > > >> > > > > >> you >> > >>>>> > > > >> > > > > >> >>>>> end >> > >>>>> > > > >> > > > > >> >>>>>>>> up deciding it's not the way to go. >> > >>>>> > > > >> > > > > >> >>>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>>> best, >> > >>>>> > > > >> > > > > >> >>>>>>>> Colin >> > >>>>> > > > >> > > > > >> >>>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong >> Lin >> > >>>>> wrote: >> > >>>>> > > > >> > > > > >> >>>>>>>>> Hi all, >> > >>>>> > > > >> > > > > >> >>>>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>>>> We created KIP-112: Handle disk >> failure >> > >>>>> for >> > >>>>> > JBOD. >> > >>>>> > > > >> Please >> > >>>>> > > > >> > > find >> > >>>>> > > > >> > > > > >> the >> > >>>>> > > > >> > > > > >> >>>>> KIP >> > >>>>> > > > >> > > > > >> >>>>>>>>> wiki >> > >>>>> > > > >> > > > > >> >>>>>>>>> in the link >> > >>>>> https://cwiki.apache.org/confl >> > >>>>> > > > >> > > > > >> >>>> uence/display/KAFKA/KIP- >> > >>>>> > > > >> > > > > >> >>>>>>>>> 112%3A+Handle+disk+failure+for >> +JBOD. >> > >>>>> > > > >> > > > > >> >>>>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>>>> This KIP is related to KIP-113 >> > >>>>> > > > >> > > > > >> >>>>>>>>> <https://cwiki.apache.org/conf >> > >>>>> > > > >> luence/display/KAFKA/KIP- >> > >>>>> > > > >> > > > > >> >>>>>>>> 113%3A+Support+replicas+moveme >> > >>>>> > > > >> nt+between+log+directories>: >> > >>>>> > > > >> > > > > >> >>>>>>>>> Support replicas movement between >> log >> > >>>>> > > directories. >> > >>>>> > > > >> They >> > >>>>> > > > >> > > are >> > >>>>> > > > >> > > > > >> >>>> needed >> > >>>>> > > > >> > > > > >> >>>>> in >> > >>>>> > > > >> > > > > >> >>>>>>>>> order >> > >>>>> > > > >> > > > > >> >>>>>>>>> to support JBOD in Kafka. Please >> help >> > >>>>> review >> > >>>>> > the >> > >>>>> > > > >> KIP. You >> > >>>>> > > > >> > > > > >> >>>> feedback >> > >>>>> > > > >> > > > > >> >>>>> is >> > >>>>> > > > >> > > > > >> >>>>>>>>> appreciated! >> > >>>>> > > > >> > > > > >> >>>>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>>>>> Thanks, >> > >>>>> > > > >> > > > > >> >>>>>>>>> Dong >> > >>>>> > > > >> > > > > >> >>>>>>>> >> > >>>>> > > > >> > > > > >> >>>>>> >> > >>>>> > > > >> > > > > >> >>>>> >> > >>>>> > > > >> > > > > >> >>>> >> > >>>>> > > > >> > > > > >> >> >> > >>>>> > > > >> > > > > >> >> >> > >>>>> > > > >> > > > > >> >> > >>>>> > > > >> > > > > >> >> > >>>>> > > > >> > > > > > >> > >>>>> > > > >> > > >> > >>>>> > > > >> >> > >>>>> > > > > >> > >>>>> > > > > >> > >>>>> > > > >> > >>>>> > > >> > >>>>> > >> > >>>>> >> > >>>> >> > >>>> >> > >>> >> > >> >> > > >> > >> > >