Hi all, Thank you all for the helpful suggestion. I have updated the KIP to address the comments received so far. See here <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=67638402&selectedPageVersions=8&selectedPageVersions=9>to read the changes of the KIP. Here is a summary of change:
- Updated the Proposed Change section to change the recovery steps. After this change, broker will also create replica as long as all log directories are working. - Removed kafka-log-dirs.sh from this KIP since user no longer needs to use it for recovery from bad disks. - Explained how the znode controller_managed_state is managed in the Public interface section. - Explained what happens during controller failover, partition reassignment and topic deletion in the Proposed Change section. - Updated Future Work section to include the following potential improvements - Let broker notify controller of ISR change and disk state change via RPC instead of using zookeeper - Handle various failure scenarios (e.g. slow disk) on a case-by-case basis. For example, we may want to detect slow disk and consider it as offline. - Allow admin to mark a directory as bad so that it will not be used. Thanks, Dong On Tue, Feb 7, 2017 at 5:23 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Eno, > > Thanks much for the comment! > > I still think the complexity added to Kafka is justified by its benefit. > Let me provide my reasons below. > > 1) The additional logic is easy to understand and thus its complexity > should be reasonable. > > On the broker side, it needs to catch exception when access log directory, > mark log directory and all its replicas as offline, notify controller by > writing the zookeeper notification path, and specify error in > LeaderAndIsrResponse. On the controller side, it will listener to > zookeeper for disk failure notification, learn about offline replicas in > the LeaderAndIsrResponse, and take offline replicas into consideration when > electing leaders. It also mark replica as created in zookeeper and use it > to determine whether a replica is created. > > That is all the logic we need to add in Kafka. I personally feel this is > easy to reason about. > > 2) The additional code is not much. > > I expect the code for KIP-112 to be around 1100 lines new code. Previously > I have implemented a prototype of a slightly different design (see here > <https://docs.google.com/document/d/1Izza0SBmZMVUBUt9s_-Dqi3D8e0KGJQYW8xgEdRsgAI/edit>) > and uploaded it to github (see here > <https://github.com/lindong28/kafka/tree/JBOD>). The patch changed 33 > files, added 1185 lines and deleted 183 lines. The size of prototype patch > is actually smaller than patch of KIP-107 (see here > <https://github.com/apache/kafka/pull/2476>) which is already accepted. > The KIP-107 patch changed 49 files, added 1349 lines and deleted 141 lines. > > 3) Comparison with one-broker-per-multiple-volumes > > This KIP can improve the availability of Kafka in this case such that one > failed volume doesn't bring down the entire broker. > > 4) Comparison with one-broker-per-volume > > If each volume maps to multiple disks, then we still have similar problem > such that the broker will fail if any disk of the volume failed. > > If each volume maps to one disk, it means that we need to deploy 10 > brokers on a machine if the machine has 10 disks. I will explain the > concern with this approach in order of their importance. > > - It is weird if we were to tell kafka user to deploy 50 brokers on a > machine of 50 disks. > > - Either when user deploys Kafka on a commercial cloud platform or when > user deploys their own cluster, the size or largest disk is usually > limited. There will be scenarios where user want to increase broker > capacity by having multiple disks per broker. This JBOD KIP makes it > feasible without hurting availability due to single disk failure. > > - Automatic load rebalance across disks will be easier and more flexible > if one broker has multiple disks. This can be future work. > > - There is performance concern when you deploy 10 broker vs. 1 broker on > one machine. The metadata the cluster, including FetchRequest, > ProduceResponse, MetadataRequest and so on will all be 10X more. The > packet-per-second will be 10X higher which may limit performance if pps is > the performance bottleneck. The number of socket on the machine is 10X > higher. And the number of replication thread will be 100X more. The impact > will be more significant with increasing number of disks per machine. Thus > it will limit Kakfa's scalability in the long term. > > Thanks, > Dong > > > On Tue, Feb 7, 2017 at 1:51 AM, Eno Thereska <eno.there...@gmail.com> > wrote: > >> Hi Dong, >> >> To simplify the discussion today, on my part I'll zoom into one thing >> only: >> >> - I'll discuss the options called below : "one-broker-per-disk" or >> "one-broker-per-few-disks". >> >> - I completely buy the JBOD vs RAID arguments so there is no need to >> discuss that part for me. I buy it that JBODs are good. >> >> I find the terminology can be improved a bit. Ideally we'd be talking >> about volumes, not disks. Just to make it clear that Kafka understand >> volumes/directories, not individual raw disks. So by >> "one-broker-per-few-disks" what I mean is that the admin can pool a few >> disks together to create a volume/directory and give that to Kafka. >> >> >> The kernel of my question will be that the admin already has tools to 1) >> create volumes/directories from a JBOD and 2) start a broker on a desired >> machine and 3) assign a broker resources like a directory. I claim that >> those tools are sufficient to optimise resource allocation. I understand >> that a broker could manage point 3) itself, ie juggle the directories. My >> question is whether the complexity added to Kafka is justified. >> Operationally it seems to me an admin will still have to do all the three >> items above. >> >> Looking forward to the discussion >> Thanks >> Eno >> >> >> > On 1 Feb 2017, at 17:21, Dong Lin <lindon...@gmail.com> wrote: >> > >> > Hey Eno, >> > >> > Thanks much for the review. >> > >> > I think your suggestion is to split disks of a machine into multiple >> disk >> > sets and run one broker per disk set. Yeah this is similar to Colin's >> > suggestion of one-broker-per-disk, which we have evaluated at LinkedIn >> and >> > considered it to be a good short term approach. >> > >> > As of now I don't think any of these approach is a better alternative in >> > the long term. I will summarize these here. I have put these reasons in >> the >> > KIP's motivation section and rejected alternative section. I am happy to >> > discuss more and I would certainly like to use an alternative solution >> that >> > is easier to do with better performance. >> > >> > - JBOD vs. RAID-10: if we switch from RAID-10 with >> replication-factoer=2 to >> > JBOD with replicatio-factor=3, we get 25% reduction in disk usage and >> > doubles the tolerance of broker failure before data unavailability from >> 1 >> > to 2. This is pretty huge gain for any company that uses Kafka at large >> > scale. >> > >> > - JBOD vs. one-broker-per-disk: The benefit of one-broker-per-disk is >> that >> > no major code change is needed in Kafka. Among the disadvantage of >> > one-broker-per-disk summarized in the KIP and previous email with Colin, >> > the biggest one is the 15% throughput loss compared to JBOD and less >> > flexibility to balance across disks. Further, it probably requires >> change >> > to internal deployment tools at various companies to deal with >> > one-broker-per-disk setup. >> > >> > - JBOD vs. RAID-0: This is the setup that used at Microsoft. The >> problem is >> > that a broker becomes unavailable if any disk fail. Suppose >> > replication-factor=2 and there are 10 disks per machine. Then the >> > probability of of any message becomes unavailable due to disk failure >> with >> > RAID-0 is 100X higher than that with JBOD. >> > >> > - JBOD vs. one-broker-per-few-disks: one-broker-per-few-disk is >> somewhere >> > between one-broker-per-disk and RAID-0. So it carries an averaged >> > disadvantages of these two approaches. >> > >> > To answer your question regarding, I think it is reasonable to mange >> disk >> > in Kafka. By "managing disks" we mean the management of assignment of >> > replicas across disks. Here are my reasons in more detail: >> > >> > - I don't think this KIP is a big step change. By allowing user to >> > configure Kafka to run multiple log directories or disks as of now, it >> is >> > implicit that Kafka manages disks. It is just not a complete feature. >> > Microsoft and probably other companies are using this feature under the >> > undesirable effect that a broker will fail any if any disk fail. It is >> good >> > to complete this feature. >> > >> > - I think it is reasonable to manage disk in Kafka. One of the most >> > important work that Kafka is doing is to determine the replica >> assignment >> > across brokers and make sure enough copies of a given replica is >> available. >> > I would argue that it is not much different than determining the replica >> > assignment across disk conceptually. >> > >> > - I would agree that this KIP is improve performance of Kafka at the >> cost >> > of more complexity inside Kafka, by switching from RAID-10 to JBOD. I >> would >> > argue that this is a right direction. If we can gain 20%+ performance by >> > managing NIC in Kafka as compared to existing approach and other >> > alternatives, I would say we should just do it. Such a gain in >> performance, >> > or equivalently reduction in cost, can save millions of dollars per year >> > for any company running Kafka at large scale. >> > >> > Thanks, >> > Dong >> > >> > >> > On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska <eno.there...@gmail.com> >> wrote: >> > >> >> I'm coming somewhat late to the discussion, apologies for that. >> >> >> >> I'm worried about this proposal. It's moving Kafka to a world where it >> >> manages disks. So in a sense, the scope of the KIP is limited, but the >> >> direction it sets for Kafka is quite a big step change. Fundamentally >> this >> >> is about balancing resources for a Kafka broker. This can be done by a >> >> tool, rather than by changing Kafka. E.g., the tool would take a bunch >> of >> >> disks together, create a volume over them and export that to a Kafka >> broker >> >> (in addition to setting the memory limits for that broker or limiting >> other >> >> resources). A different bunch of disks can then make up a second >> volume, >> >> and be used by another Kafka broker. This is aligned with what Colin is >> >> saying (as I understand it). >> >> >> >> Disks are not the only resource on a machine, there are several >> instances >> >> where multiple NICs are used for example. Do we want fine grained >> >> management of all these resources? I'd argue that opens us the system >> to a >> >> lot of complexity. >> >> >> >> Thanks >> >> Eno >> >> >> >> >> >>> On 1 Feb 2017, at 01:53, Dong Lin <lindon...@gmail.com> wrote: >> >>> >> >>> Hi all, >> >>> >> >>> I am going to initiate the vote If there is no further concern with >> the >> >> KIP. >> >>> >> >>> Thanks, >> >>> Dong >> >>> >> >>> >> >>> On Fri, Jan 27, 2017 at 8:08 PM, radai <radai.rosenbl...@gmail.com> >> >> wrote: >> >>> >> >>>> a few extra points: >> >>>> >> >>>> 1. broker per disk might also incur more client <--> broker sockets: >> >>>> suppose every producer / consumer "talks" to >1 partition, there's a >> >> very >> >>>> good chance that partitions that were co-located on a single 10-disk >> >> broker >> >>>> would now be split between several single-disk broker processes on >> the >> >> same >> >>>> machine. hard to put a multiplier on this, but likely >x1. sockets >> are a >> >>>> limited resource at the OS level and incur some memory cost (kernel >> >>>> buffers) >> >>>> >> >>>> 2. there's a memory overhead to spinning up a JVM (compiled code and >> >> byte >> >>>> code objects etc). if we assume this overhead is ~300 MB (order of >> >>>> magnitude, specifics vary) than spinning up 10 JVMs would lose you 3 >> GB >> >> of >> >>>> RAM. not a ton, but non negligible. >> >>>> >> >>>> 3. there would also be some overhead downstream of kafka in any >> >> management >> >>>> / monitoring / log aggregation system. likely less than x10 though. >> >>>> >> >>>> 4. (related to above) - added complexity of administration with more >> >>>> running instances. >> >>>> >> >>>> is anyone running kafka with anywhere near 100GB heaps? i thought the >> >> point >> >>>> was to rely on kernel page cache to do the disk buffering .... >> >>>> >> >>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin <lindon...@gmail.com> >> wrote: >> >>>> >> >>>>> Hey Colin, >> >>>>> >> >>>>> Thanks much for the comment. Please see me comment inline. >> >>>>> >> >>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe <cmcc...@apache.org> >> >>>> wrote: >> >>>>> >> >>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote: >> >>>>>>> Hey Colin, >> >>>>>>> >> >>>>>>> Good point! Yeah we have actually considered and tested this >> >>>> solution, >> >>>>>>> which we call one-broker-per-disk. It would work and should >> require >> >>>> no >> >>>>>>> major change in Kafka as compared to this JBOD KIP. So it would >> be a >> >>>>> good >> >>>>>>> short term solution. >> >>>>>>> >> >>>>>>> But it has a few drawbacks which makes it less desirable in the >> long >> >>>>>>> term. >> >>>>>>> Assume we have 10 disks on a machine. Here are the problems: >> >>>>>> >> >>>>>> Hi Dong, >> >>>>>> >> >>>>>> Thanks for the thoughtful reply. >> >>>>>> >> >>>>>>> >> >>>>>>> 1) Our stress test result shows that one-broker-per-disk has 15% >> >>>> lower >> >>>>>>> throughput >> >>>>>>> >> >>>>>>> 2) Controller would need to send 10X as many LeaderAndIsrRequest, >> >>>>>>> MetadataUpdateRequest and StopReplicaRequest. This increases the >> >>>> burden >> >>>>>>> on >> >>>>>>> controller which can be the performance bottleneck. >> >>>>>> >> >>>>>> Maybe I'm misunderstanding something, but there would not be 10x as >> >>>> many >> >>>>>> StopReplicaRequest RPCs, would there? The other requests would >> >>>> increase >> >>>>>> 10x, but from a pretty low base, right? We are not reassigning >> >>>>>> partitions all the time, I hope (or else we have bigger >> problems...) >> >>>>>> >> >>>>> >> >>>>> I think the controller will group StopReplicaRequest per broker and >> >> send >> >>>>> only one StopReplicaRequest to a broker during controlled shutdown. >> >>>> Anyway, >> >>>>> we don't have to worry about this if we agree that other requests >> will >> >>>>> increase by 10X. One MetadataRequest to send to each broker in the >> >>>> cluster >> >>>>> every time there is leadership change. I am not sure this is a real >> >>>>> problem. But in theory this makes the overhead complexity O(number >> of >> >>>>> broker) and may be a concern in the future. Ideally we should avoid >> it. >> >>>>> >> >>>>> >> >>>>>> >> >>>>>>> >> >>>>>>> 3) Less efficient use of physical resource on the machine. The >> number >> >>>>> of >> >>>>>>> socket on each machine will increase by 10X. The number of >> connection >> >>>>>>> between any two machine will increase by 100X. >> >>>>>>> >> >>>>>>> 4) Less efficient way to management memory and quota. >> >>>>>>> >> >>>>>>> 5) Rebalance between disks/brokers on the same machine will less >> >>>>>>> efficient >> >>>>>>> and less flexible. Broker has to read data from another broker on >> the >> >>>>>>> same >> >>>>>>> machine via socket. It is also harder to do automatic load balance >> >>>>>>> between >> >>>>>>> disks on the same machine in the future. >> >>>>>>> >> >>>>>>> I will put this and the explanation in the rejected alternative >> >>>>> section. >> >>>>>>> I >> >>>>>>> have a few questions: >> >>>>>>> >> >>>>>>> - Can you explain why this solution can help avoid scalability >> >>>>>>> bottleneck? >> >>>>>>> I actually think it will exacerbate the scalability problem due >> the >> >>>> 2) >> >>>>>>> above. >> >>>>>>> - Why can we push more RPC with this solution? >> >>>>>> >> >>>>>> To really answer this question we'd have to take a deep dive into >> the >> >>>>>> locking of the broker and figure out how effectively it can >> >> parallelize >> >>>>>> truly independent requests. Almost every multithreaded process is >> >>>> going >> >>>>>> to have shared state, like shared queues or shared sockets, that is >> >>>>>> going to make scaling less than linear when you add disks or >> >>>> processors. >> >>>>>> (And clearly, another option is to improve that scalability, rather >> >>>>>> than going multi-process!) >> >>>>>> >> >>>>> >> >>>>> Yeah I also think it is better to improve scalability inside kafka >> code >> >>>> if >> >>>>> possible. I am not sure we currently have any scalability issue >> inside >> >>>>> Kafka that can not be removed without using multi-process. >> >>>>> >> >>>>> >> >>>>>> >> >>>>>>> - It is true that a garbage collection in one broker would not >> affect >> >>>>>>> others. But that is after every broker only uses 1/10 of the >> memory. >> >>>>> Can >> >>>>>>> we be sure that this will actually help performance? >> >>>>>> >> >>>>>> The big question is, how much memory do Kafka brokers use now, and >> how >> >>>>>> much will they use in the future? Our experience in HDFS was that >> >> once >> >>>>>> you start getting more than 100-200GB Java heap sizes, full GCs >> start >> >>>>>> taking minutes to finish when using the standard JVMs. That alone >> is >> >> a >> >>>>>> good reason to go multi-process or consider storing more things off >> >> the >> >>>>>> Java heap. >> >>>>>> >> >>>>> >> >>>>> I see. Now I agree one-broker-per-disk should be more efficient in >> >> terms >> >>>> of >> >>>>> GC since each broker probably needs less than 1/10 of the memory >> >>>> available >> >>>>> on a typical machine nowadays. I will remove this from the reason of >> >>>>> rejection. >> >>>>> >> >>>>> >> >>>>>> >> >>>>>> Disk failure is the "easy" case. The "hard" case, which is >> >>>>>> unfortunately also the much more common case, is disk misbehavior. >> >>>>>> Towards the end of their lives, disks tend to start slowing down >> >>>>>> unpredictably. Requests that would have completed immediately >> before >> >>>>>> start taking 20, 100 500 milliseconds. Some files may be readable >> and >> >>>>>> other files may not be. System calls hang, sometimes forever, and >> the >> >>>>>> Java process can't abort them, because the hang is in the kernel. >> It >> >>>> is >> >>>>>> not fun when threads are stuck in "D state" >> >>>>>> http://stackoverflow.com/questions/20423521/process-perminan >> >>>>>> tly-stuck-on-d-state >> >>>>>> . Even kill -9 cannot abort the thread then. Fortunately, this is >> >>>>>> rare. >> >>>>>> >> >>>>> >> >>>>> I agree it is a harder problem and it is rare. We probably don't >> have >> >> to >> >>>>> worry about it in this KIP since this issue is orthogonal to >> whether or >> >>>> not >> >>>>> we use JBOD. >> >>>>> >> >>>>> >> >>>>>> >> >>>>>> Another approach we should consider is for Kafka to implement its >> own >> >>>>>> storage layer that would stripe across multiple disks. This >> wouldn't >> >>>>>> have to be done at the block level, but could be done at the file >> >>>> level. >> >>>>>> We could use consistent hashing to determine which disks a file >> should >> >>>>>> end up on, for example. >> >>>>>> >> >>>>> >> >>>>> Are you suggesting that we should distribute log, or log segment, >> >> across >> >>>>> disks of brokers? I am not sure if I fully understand this >> approach. My >> >>>> gut >> >>>>> feel is that this would be a drastic solution that would require >> >>>>> non-trivial design. While this may be useful to Kafka, I would >> prefer >> >> not >> >>>>> to discuss this in detail in this thread unless you believe it is >> >>>> strictly >> >>>>> superior to the design in this KIP in terms of solving our use-case. >> >>>>> >> >>>>> >> >>>>>> best, >> >>>>>> Colin >> >>>>>> >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> Dong >> >>>>>>> >> >>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe < >> cmcc...@apache.org> >> >>>>>>> wrote: >> >>>>>>> >> >>>>>>>> Hi Dong, >> >>>>>>>> >> >>>>>>>> Thanks for the writeup! It's very interesting. >> >>>>>>>> >> >>>>>>>> I apologize in advance if this has been discussed somewhere else. >> >>>>> But >> >>>>>> I >> >>>>>>>> am curious if you have considered the solution of running >> multiple >> >>>>>>>> brokers per node. Clearly there is a memory overhead with this >> >>>>>> solution >> >>>>>>>> because of the fixed cost of starting multiple JVMs. However, >> >>>>> running >> >>>>>>>> multiple JVMs would help avoid scalability bottlenecks. You >> could >> >>>>>>>> probably push more RPCs per second, for example. A garbage >> >>>>> collection >> >>>>>>>> in one broker would not affect the others. It would be >> interesting >> >>>>> to >> >>>>>>>> see this considered in the "alternate designs" design, even if >> you >> >>>>> end >> >>>>>>>> up deciding it's not the way to go. >> >>>>>>>> >> >>>>>>>> best, >> >>>>>>>> Colin >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote: >> >>>>>>>>> Hi all, >> >>>>>>>>> >> >>>>>>>>> We created KIP-112: Handle disk failure for JBOD. Please find >> the >> >>>>> KIP >> >>>>>>>>> wiki >> >>>>>>>>> in the link https://cwiki.apache.org/confl >> >>>> uence/display/KAFKA/KIP- >> >>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD. >> >>>>>>>>> >> >>>>>>>>> This KIP is related to KIP-113 >> >>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> >>>>>>>> 113%3A+Support+replicas+movement+between+log+directories>: >> >>>>>>>>> Support replicas movement between log directories. They are >> >>>> needed >> >>>>> in >> >>>>>>>>> order >> >>>>>>>>> to support JBOD in Kafka. Please help review the KIP. You >> >>>> feedback >> >>>>> is >> >>>>>>>>> appreciated! >> >>>>>>>>> >> >>>>>>>>> Thanks, >> >>>>>>>>> Dong >> >>>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >> >> >> >> >> >