I'm coming somewhat late to the discussion, apologies for that. I'm worried about this proposal. It's moving Kafka to a world where it manages disks. So in a sense, the scope of the KIP is limited, but the direction it sets for Kafka is quite a big step change. Fundamentally this is about balancing resources for a Kafka broker. This can be done by a tool, rather than by changing Kafka. E.g., the tool would take a bunch of disks together, create a volume over them and export that to a Kafka broker (in addition to setting the memory limits for that broker or limiting other resources). A different bunch of disks can then make up a second volume, and be used by another Kafka broker. This is aligned with what Colin is saying (as I understand it).
Disks are not the only resource on a machine, there are several instances where multiple NICs are used for example. Do we want fine grained management of all these resources? I'd argue that opens us the system to a lot of complexity. Thanks Eno > On 1 Feb 2017, at 01:53, Dong Lin <[email protected]> wrote: > > Hi all, > > I am going to initiate the vote If there is no further concern with the KIP. > > Thanks, > Dong > > > On Fri, Jan 27, 2017 at 8:08 PM, radai <[email protected]> wrote: > >> a few extra points: >> >> 1. broker per disk might also incur more client <--> broker sockets: >> suppose every producer / consumer "talks" to >1 partition, there's a very >> good chance that partitions that were co-located on a single 10-disk broker >> would now be split between several single-disk broker processes on the same >> machine. hard to put a multiplier on this, but likely >x1. sockets are a >> limited resource at the OS level and incur some memory cost (kernel >> buffers) >> >> 2. there's a memory overhead to spinning up a JVM (compiled code and byte >> code objects etc). if we assume this overhead is ~300 MB (order of >> magnitude, specifics vary) than spinning up 10 JVMs would lose you 3 GB of >> RAM. not a ton, but non negligible. >> >> 3. there would also be some overhead downstream of kafka in any management >> / monitoring / log aggregation system. likely less than x10 though. >> >> 4. (related to above) - added complexity of administration with more >> running instances. >> >> is anyone running kafka with anywhere near 100GB heaps? i thought the point >> was to rely on kernel page cache to do the disk buffering .... >> >> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin <[email protected]> wrote: >> >>> Hey Colin, >>> >>> Thanks much for the comment. Please see me comment inline. >>> >>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe <[email protected]> >> wrote: >>> >>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote: >>>>> Hey Colin, >>>>> >>>>> Good point! Yeah we have actually considered and tested this >> solution, >>>>> which we call one-broker-per-disk. It would work and should require >> no >>>>> major change in Kafka as compared to this JBOD KIP. So it would be a >>> good >>>>> short term solution. >>>>> >>>>> But it has a few drawbacks which makes it less desirable in the long >>>>> term. >>>>> Assume we have 10 disks on a machine. Here are the problems: >>>> >>>> Hi Dong, >>>> >>>> Thanks for the thoughtful reply. >>>> >>>>> >>>>> 1) Our stress test result shows that one-broker-per-disk has 15% >> lower >>>>> throughput >>>>> >>>>> 2) Controller would need to send 10X as many LeaderAndIsrRequest, >>>>> MetadataUpdateRequest and StopReplicaRequest. This increases the >> burden >>>>> on >>>>> controller which can be the performance bottleneck. >>>> >>>> Maybe I'm misunderstanding something, but there would not be 10x as >> many >>>> StopReplicaRequest RPCs, would there? The other requests would >> increase >>>> 10x, but from a pretty low base, right? We are not reassigning >>>> partitions all the time, I hope (or else we have bigger problems...) >>>> >>> >>> I think the controller will group StopReplicaRequest per broker and send >>> only one StopReplicaRequest to a broker during controlled shutdown. >> Anyway, >>> we don't have to worry about this if we agree that other requests will >>> increase by 10X. One MetadataRequest to send to each broker in the >> cluster >>> every time there is leadership change. I am not sure this is a real >>> problem. But in theory this makes the overhead complexity O(number of >>> broker) and may be a concern in the future. Ideally we should avoid it. >>> >>> >>>> >>>>> >>>>> 3) Less efficient use of physical resource on the machine. The number >>> of >>>>> socket on each machine will increase by 10X. The number of connection >>>>> between any two machine will increase by 100X. >>>>> >>>>> 4) Less efficient way to management memory and quota. >>>>> >>>>> 5) Rebalance between disks/brokers on the same machine will less >>>>> efficient >>>>> and less flexible. Broker has to read data from another broker on the >>>>> same >>>>> machine via socket. It is also harder to do automatic load balance >>>>> between >>>>> disks on the same machine in the future. >>>>> >>>>> I will put this and the explanation in the rejected alternative >>> section. >>>>> I >>>>> have a few questions: >>>>> >>>>> - Can you explain why this solution can help avoid scalability >>>>> bottleneck? >>>>> I actually think it will exacerbate the scalability problem due the >> 2) >>>>> above. >>>>> - Why can we push more RPC with this solution? >>>> >>>> To really answer this question we'd have to take a deep dive into the >>>> locking of the broker and figure out how effectively it can parallelize >>>> truly independent requests. Almost every multithreaded process is >> going >>>> to have shared state, like shared queues or shared sockets, that is >>>> going to make scaling less than linear when you add disks or >> processors. >>>> (And clearly, another option is to improve that scalability, rather >>>> than going multi-process!) >>>> >>> >>> Yeah I also think it is better to improve scalability inside kafka code >> if >>> possible. I am not sure we currently have any scalability issue inside >>> Kafka that can not be removed without using multi-process. >>> >>> >>>> >>>>> - It is true that a garbage collection in one broker would not affect >>>>> others. But that is after every broker only uses 1/10 of the memory. >>> Can >>>>> we be sure that this will actually help performance? >>>> >>>> The big question is, how much memory do Kafka brokers use now, and how >>>> much will they use in the future? Our experience in HDFS was that once >>>> you start getting more than 100-200GB Java heap sizes, full GCs start >>>> taking minutes to finish when using the standard JVMs. That alone is a >>>> good reason to go multi-process or consider storing more things off the >>>> Java heap. >>>> >>> >>> I see. Now I agree one-broker-per-disk should be more efficient in terms >> of >>> GC since each broker probably needs less than 1/10 of the memory >> available >>> on a typical machine nowadays. I will remove this from the reason of >>> rejection. >>> >>> >>>> >>>> Disk failure is the "easy" case. The "hard" case, which is >>>> unfortunately also the much more common case, is disk misbehavior. >>>> Towards the end of their lives, disks tend to start slowing down >>>> unpredictably. Requests that would have completed immediately before >>>> start taking 20, 100 500 milliseconds. Some files may be readable and >>>> other files may not be. System calls hang, sometimes forever, and the >>>> Java process can't abort them, because the hang is in the kernel. It >> is >>>> not fun when threads are stuck in "D state" >>>> http://stackoverflow.com/questions/20423521/process-perminan >>>> tly-stuck-on-d-state >>>> . Even kill -9 cannot abort the thread then. Fortunately, this is >>>> rare. >>>> >>> >>> I agree it is a harder problem and it is rare. We probably don't have to >>> worry about it in this KIP since this issue is orthogonal to whether or >> not >>> we use JBOD. >>> >>> >>>> >>>> Another approach we should consider is for Kafka to implement its own >>>> storage layer that would stripe across multiple disks. This wouldn't >>>> have to be done at the block level, but could be done at the file >> level. >>>> We could use consistent hashing to determine which disks a file should >>>> end up on, for example. >>>> >>> >>> Are you suggesting that we should distribute log, or log segment, across >>> disks of brokers? I am not sure if I fully understand this approach. My >> gut >>> feel is that this would be a drastic solution that would require >>> non-trivial design. While this may be useful to Kafka, I would prefer not >>> to discuss this in detail in this thread unless you believe it is >> strictly >>> superior to the design in this KIP in terms of solving our use-case. >>> >>> >>>> best, >>>> Colin >>>> >>>>> >>>>> Thanks, >>>>> Dong >>>>> >>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Dong, >>>>>> >>>>>> Thanks for the writeup! It's very interesting. >>>>>> >>>>>> I apologize in advance if this has been discussed somewhere else. >>> But >>>> I >>>>>> am curious if you have considered the solution of running multiple >>>>>> brokers per node. Clearly there is a memory overhead with this >>>> solution >>>>>> because of the fixed cost of starting multiple JVMs. However, >>> running >>>>>> multiple JVMs would help avoid scalability bottlenecks. You could >>>>>> probably push more RPCs per second, for example. A garbage >>> collection >>>>>> in one broker would not affect the others. It would be interesting >>> to >>>>>> see this considered in the "alternate designs" design, even if you >>> end >>>>>> up deciding it's not the way to go. >>>>>> >>>>>> best, >>>>>> Colin >>>>>> >>>>>> >>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote: >>>>>>> Hi all, >>>>>>> >>>>>>> We created KIP-112: Handle disk failure for JBOD. Please find the >>> KIP >>>>>>> wiki >>>>>>> in the link https://cwiki.apache.org/confl >> uence/display/KAFKA/KIP- >>>>>>> 112%3A+Handle+disk+failure+for+JBOD. >>>>>>> >>>>>>> This KIP is related to KIP-113 >>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>> 113%3A+Support+replicas+movement+between+log+directories>: >>>>>>> Support replicas movement between log directories. They are >> needed >>> in >>>>>>> order >>>>>>> to support JBOD in Kafka. Please help review the KIP. You >> feedback >>> is >>>>>>> appreciated! >>>>>>> >>>>>>> Thanks, >>>>>>> Dong >>>>>> >>>> >>> >>
