Hi Dong, The KIP meetings are traditionally held at 11am. Would that also work? So Tuesday 7th at 11am?
Thanks Eno > On 2 Feb 2017, at 02:53, Dong Lin <lindon...@gmail.com> wrote: > > Hey Eno, Colin, > > Would you have time next Tuesday morning to discuss the KIP? How about 10 - > 11 am? > > To make best use of our time, can you please invite one or more committer > from Confluent to join the meeting? I hope the KIP can receive one or more > +1 from committer at Confluent if we have no concern the KIP after the KIP > meeting. > > In the meeting time, please feel free to provide comment in the thread so > that discussion in the KIP meeting can be more efficient. > > Thanks, > Dong > > On Wed, Feb 1, 2017 at 5:43 PM, Dong Lin <lindon...@gmail.com> wrote: > >> Hey Colin, >> >> Thanks much for the comment. Please see my reply inline. >> >> On Wed, Feb 1, 2017 at 1:54 PM, Colin McCabe <cmcc...@apache.org> wrote: >> >>> On Wed, Feb 1, 2017, at 11:35, Dong Lin wrote: >>>> Hey Grant, Colin, >>>> >>>> My bad, I misunderstood Grant's suggestion initially. Indeed this is a >>>> very >>>> interesting idea to just wait for replica.max.lag.ms for the replica on >>>> the >>>> bad disk to drop out of ISR instead of having broker actively reporting >>>> this to the controller. >>>> >>>> I have several concerns with this approach. >>>> >>>> - Broker needs to maintain persist the information of all partitions >>> that >>>> it has created, in a file in each disk. This is needed for broker to >>> know >>>> the replicas already created on the bad disks that it can not access. If >>>> we >>>> don't do it, then controller sends LeaderAndIsrRequest to a broker to >>>> become follower for a partition on the bad disk, the broker will create >>>> partition on a good disk. The good disks may be overloaded as cascading >>>> effect. >>>> >>>> While it is possible to let broker keep track of the replicas that it >>> has >>>> created, I think it is less clean than the approach in the current KIP >>>> for >>>> reason provided in the rejective alternative section. >>>> >>>> - Change is needed in the controller logic to handle failure to make a >>>> broker as leader when controller receives LeaderAndIsrResponse. >>>> Otherwise, >>>> things go wrong if partition on the bad disk is requested to become >>>> leader. >>>> As of now, broker doesn't handle error in LeaderAndIsrResponse. >>>> >>>> - We still need tools and mechanism for administrator to know whether a >>>> replica is offline due to bad disk. I am worried that asking >>>> administrator >>>> to log into a machine and get this information in the log is not >>> scalable >>>> when the broker number is large. Although each company can develop their >>>> internal tools to get this information, it is a waste of developer time >>>> to >>>> reinvent the wheel. Reading this information in the log also seems less >>>> reliable then getting it from Kafka request/response. >>>> >>>> I guess the goal of this alternative approach is to avoid making major >>>> change in Kafka at the cost of increased disk failure discovery time >>> etc. >>>> But I think the changes required for fixing the problems above won't be >>>> much less. >>> >>> Thanks for the thoughtful replies, Dong L. >>> >>> Instead of having an "offline" state, how about having a "creating" >>> state for replicas and a "created" state? Then if a replica was not >>> accessible on any disk, but still in "created" state, the broker could >>> know that something had gone wrong. This also would catch issues like >>> the broker being started without all log directories configured, or >>> disks not being correctly mounted at the expected mount points, leading >>> to empty directories. >>> >> >> Indeed, we need to have an additional state per replica to solve this >> problem. The current KIP design addresses the problem by putting the >> "created" state in zookeeper, as you can see in the public interface change >> of the KIP. Are you suggesting to solve the problem by storing this >> information in local disk of the broker instead of zookeeper? I have two >> concerns with this approach: >> >> - It requires broker to keep track of the replicas it has created. This >> solution will split the task of determining offline replicas among >> controller and brokers as opposed to the current Kafka design, where the >> controller determines states of replicas and propagate this information to >> brokers. We think it is less error-prone to still let controller be the >> only entity that maintains metadata (e.g. replica state) of Kafka cluster. >> >> - If we store this information in local disk, then we need to have >> additional request/response protocol in order to request broker to reset >> this information, e.g. after a bad disk is replaced with good disk, so that >> the replica can be re-created on a good disk. Things would be easier if we >> store this information in zookeeper. >> >> >>> >>>> >>>> To answer Colin's questions: >>>> >>>> - There is no action required on the side of administrator in case of >>> log >>>> directory failure. >>>> >>>> - Broker itself is going to discover log directory failure and declare >>>> offline replicas. Broker doesn't explicitly declare log directory >>>> failure. >>>> But administrator can learn from the MetadataResponse that replica is >>>> offline due to disk failure, i.e. if replica is offline but broker is >>>> online. >>> >>> Can you expand on this a little bit? It sounds like you are considering >>> dealing with failures on a replica-by-replica basis, rather than a >>> disk-by-disk basis. But it's disks that fail, not really individual >>> files or directories on disks. This decision interacts poorly with the >>> lack of a periodic scanner. It's easy to imagine a scenario where an >>> infrequently used replica sits on a dead disk for a long time without us >>> declaring it dead. >>> >> >> Sure. The broker will fail deal with failures on a disk-by-disk basis. All >> replicas on a disk to be failed if there is any disk related exception when >> broker accesses that disk. It means that if any replica on that broker can >> not be read, then all replicas on that disk are considered offline. Since >> controller doesn't know the disk names of replicas, it has to learn the >> liveness of replicas on a replica-by-replica basis in order to do leader >> election. >> >> Besides, I don't think we will have problem with the scenario you >> described. If a replica is indeed not touched for a long time, then it >> doesn't matter whether it is considered dead or not. The moment it is >> needed, either for read or for write, the KIP makes sure that we know its >> state and make leader election accordingly. >> >> >>>> >>>> - This KIP does not handle cases where a few disks on a broker are full, >>>> but the others have space. If a disk is full and can not be written then >>>> the disk is considered to have failed. The imbalance across disks is an >>>> existing problem and will be handled in KIP-113. >>> >>> OK. >>> >>>> >>>> - This KIP does not do a disk scanner that will periodically check for >>>> error conditions. It doesn't handle any performance degradation of >>> disks. >>>> We wait for a failure to happen before declaring a disk bad. >>>> >>>> Yes, this KIP requires us to fix cases in the code where we are >>>> suppressing >>>> disk errors or ignoring their root cause. But decision of which >>> Exception >>>> should be considered disk failure and how to handle each of these are >>>> more >>>> like implementation detail. I hope we can focus on the detail and high >>>> level idea of this KIP and only worry about specific exception when the >>>> patch is being reviewed. >>> >>> Hmm... I think we should discuss how we are going to harden the code >>> against disk failures, and verify that it has been hardened. Or maybe >>> we could do this in a follow-up KIP. >>> >> >> By "harden the code against disk errors" do you mean that we should make a >> full list of disk-related exception we may see and decide if we should >> treat each of these differently? I agree it is useful in the long term. But >> this is actually out of the scope of this KIP. The KIP will re-use the >> existing KafkaStorageException without having to change what exceptions are >> considered KafkaStorageException. The goal is to fail replicas on a disk >> instead of crashing the broker when KafkaStorageException is observed. >> >> Thanks, >> Dong >> >> >>>> After all we probably only know the list of >>>> exceptions and ways to handle them when we start to implement the KIP. >>>> And >>>> we need to improve this list over time as we discover various failure in >>>> the deployment. >>>> >>>> >>>> Hey Eno, >>>> >>>> Sure thing. Thanks for offering time to have a KIP meeting to discuss >>>> this. >>>> I will ask other Kafka developer at LinkedIn about their availability. >>> >>> Yeah, it would be nice to talk about this. >> >> >>> regards, >>> Colin >>> >>> >>>> >>>> Thanks, >>>> Dong >>>> >>>> >>>> On Wed, Feb 1, 2017 at 10:37 AM, Eno Thereska <eno.there...@gmail.com> >>>> wrote: >>>> >>>>> Hi Dong, >>>>> >>>>> Would it make sense to do a discussion over video/voice about this? I >>>>> think it's sufficiently complex that we can probably make quicker >>> progress >>>>> that way? So shall we do a KIP meeting soon? I can do this week >>> (Thu/Fri) >>>>> or next week. >>>>> >>>>> Thanks >>>>> Eno >>>>>> On 1 Feb 2017, at 18:29, Colin McCabe <cmcc...@apache.org> wrote: >>>>>> >>>>>> Hmm. Maybe I misinterpreted, but I got the impression that Grant >>> was >>>>>> suggesting that we avoid introducing this concept of "offline >>> replicas" >>>>>> for now. Is that feasible? >>>>>> >>>>>> What is the strategy for declaring a log directory bad? Is it an >>>>>> administrative action? Or is the broker itself going to be >>> responsible >>>>>> for this? How do we handle cases where a few disks on a broker are >>>>>> full, but the others have space? >>>>>> >>>>>> Are we going to have a disk scanner that will periodically check for >>>>>> error conditions (similar to the background checks that RAID >>> controllers >>>>>> do)? Or will we wait for a failure to happen before declaring a >>> disk >>>>>> bad? >>>>>> >>>>>> It seems to me that if we want this to work well we will need to fix >>>>>> cases in the code where we are suppressing disk errors or ignoring >>> their >>>>>> root cause. For example, any place where we are using the old Java >>> APIs >>>>>> that just return a boolean on failure will need to be fixed, since >>> the >>>>>> failure could now be disk full, permission denied, or IOE, and we >>> will >>>>>> need to handle those cases differently. Also, we will need to >>> harden >>>>>> the code against disk errors. Formerly it was OK to just crash on a >>>>>> disk error; now it is not. It would be nice to see more in the test >>>>>> plan about injecting IOExceptions into disk handling code and >>> verifying >>>>>> that we can handle it correctly. >>>>>> >>>>>> regards, >>>>>> Colin >>>>>> >>>>>> >>>>>> On Wed, Feb 1, 2017, at 10:02, Dong Lin wrote: >>>>>>> Hey Grant, >>>>>>> >>>>>>> Yes, this KIP does exactly what you described:) >>>>>>> >>>>>>> Thanks, >>>>>>> Dong >>>>>>> >>>>>>> On Wed, Feb 1, 2017 at 9:45 AM, Grant Henke <ghe...@cloudera.com> >>>>> wrote: >>>>>>> >>>>>>>> Hi Dong, >>>>>>>> >>>>>>>> Thanks for putting this together. >>>>>>>> >>>>>>>> Since we are discussing alternative/simplified options. Have you >>>>> considered >>>>>>>> handling the disk failures broker side to prevent a crash, >>> marking the >>>>> disk >>>>>>>> as "bad" to that individual broker, and continuing as normal? I >>>>> imagine the >>>>>>>> broker would then fall out of sync for the replicas hosted on the >>> bad >>>>> disk >>>>>>>> and the ISR would shrink. This would allow people using min.isr >>> to keep >>>>>>>> their data safe and the cluster operators would see a shrink in >>> many >>>>> ISRs >>>>>>>> and hopefully an obvious log message leading to a quick fix. I >>> haven't >>>>>>>> thought through this idea in depth though. So there could be some >>>>>>>> shortfalls. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Grant >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Feb 1, 2017 at 11:21 AM, Dong Lin <lindon...@gmail.com> >>> wrote: >>>>>>>> >>>>>>>>> Hey Eno, >>>>>>>>> >>>>>>>>> Thanks much for the review. >>>>>>>>> >>>>>>>>> I think your suggestion is to split disks of a machine into >>> multiple >>>>> disk >>>>>>>>> sets and run one broker per disk set. Yeah this is similar to >>> Colin's >>>>>>>>> suggestion of one-broker-per-disk, which we have evaluated at >>> LinkedIn >>>>>>>> and >>>>>>>>> considered it to be a good short term approach. >>>>>>>>> >>>>>>>>> As of now I don't think any of these approach is a better >>> alternative >>>>> in >>>>>>>>> the long term. I will summarize these here. I have put these >>> reasons >>>>> in >>>>>>>> the >>>>>>>>> KIP's motivation section and rejected alternative section. I am >>> happy >>>>> to >>>>>>>>> discuss more and I would certainly like to use an alternative >>> solution >>>>>>>> that >>>>>>>>> is easier to do with better performance. >>>>>>>>> >>>>>>>>> - JBOD vs. RAID-10: if we switch from RAID-10 with >>>>> replication-factoer=2 >>>>>>>> to >>>>>>>>> JBOD with replicatio-factor=3, we get 25% reduction in disk >>> usage and >>>>>>>>> doubles the tolerance of broker failure before data >>> unavailability >>>>> from 1 >>>>>>>>> to 2. This is pretty huge gain for any company that uses Kafka at >>>>> large >>>>>>>>> scale. >>>>>>>>> >>>>>>>>> - JBOD vs. one-broker-per-disk: The benefit of >>> one-broker-per-disk is >>>>>>>> that >>>>>>>>> no major code change is needed in Kafka. Among the disadvantage >>> of >>>>>>>>> one-broker-per-disk summarized in the KIP and previous email with >>>>> Colin, >>>>>>>>> the biggest one is the 15% throughput loss compared to JBOD and >>> less >>>>>>>>> flexibility to balance across disks. Further, it probably >>> requires >>>>> change >>>>>>>>> to internal deployment tools at various companies to deal with >>>>>>>>> one-broker-per-disk setup. >>>>>>>>> >>>>>>>>> - JBOD vs. RAID-0: This is the setup that used at Microsoft. The >>>>> problem >>>>>>>> is >>>>>>>>> that a broker becomes unavailable if any disk fail. Suppose >>>>>>>>> replication-factor=2 and there are 10 disks per machine. Then the >>>>>>>>> probability of of any message becomes unavailable due to disk >>> failure >>>>>>>> with >>>>>>>>> RAID-0 is 100X higher than that with JBOD. >>>>>>>>> >>>>>>>>> - JBOD vs. one-broker-per-few-disks: one-broker-per-few-disk is >>>>> somewhere >>>>>>>>> between one-broker-per-disk and RAID-0. So it carries an averaged >>>>>>>>> disadvantages of these two approaches. >>>>>>>>> >>>>>>>>> To answer your question regarding, I think it is reasonable to >>> mange >>>>> disk >>>>>>>>> in Kafka. By "managing disks" we mean the management of >>> assignment of >>>>>>>>> replicas across disks. Here are my reasons in more detail: >>>>>>>>> >>>>>>>>> - I don't think this KIP is a big step change. By allowing user >>> to >>>>>>>>> configure Kafka to run multiple log directories or disks as of >>> now, >>>>> it is >>>>>>>>> implicit that Kafka manages disks. It is just not a complete >>> feature. >>>>>>>>> Microsoft and probably other companies are using this feature >>> under >>>>> the >>>>>>>>> undesirable effect that a broker will fail any if any disk fail. >>> It is >>>>>>>> good >>>>>>>>> to complete this feature. >>>>>>>>> >>>>>>>>> - I think it is reasonable to manage disk in Kafka. One of the >>> most >>>>>>>>> important work that Kafka is doing is to determine the replica >>>>> assignment >>>>>>>>> across brokers and make sure enough copies of a given replica is >>>>>>>> available. >>>>>>>>> I would argue that it is not much different than determining the >>>>> replica >>>>>>>>> assignment across disk conceptually. >>>>>>>>> >>>>>>>>> - I would agree that this KIP is improve performance of Kafka at >>> the >>>>> cost >>>>>>>>> of more complexity inside Kafka, by switching from RAID-10 to >>> JBOD. I >>>>>>>> would >>>>>>>>> argue that this is a right direction. If we can gain 20%+ >>> performance >>>>> by >>>>>>>>> managing NIC in Kafka as compared to existing approach and other >>>>>>>>> alternatives, I would say we should just do it. Such a gain in >>>>>>>> performance, >>>>>>>>> or equivalently reduction in cost, can save millions of dollars >>> per >>>>> year >>>>>>>>> for any company running Kafka at large scale. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Dong >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska < >>> eno.there...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I'm coming somewhat late to the discussion, apologies for that. >>>>>>>>>> >>>>>>>>>> I'm worried about this proposal. It's moving Kafka to a world >>> where >>>>> it >>>>>>>>>> manages disks. So in a sense, the scope of the KIP is limited, >>> but >>>>> the >>>>>>>>>> direction it sets for Kafka is quite a big step change. >>> Fundamentally >>>>>>>>> this >>>>>>>>>> is about balancing resources for a Kafka broker. This can be >>> done by >>>>> a >>>>>>>>>> tool, rather than by changing Kafka. E.g., the tool would take a >>>>> bunch >>>>>>>> of >>>>>>>>>> disks together, create a volume over them and export that to a >>> Kafka >>>>>>>>> broker >>>>>>>>>> (in addition to setting the memory limits for that broker or >>> limiting >>>>>>>>> other >>>>>>>>>> resources). A different bunch of disks can then make up a second >>>>>>>> volume, >>>>>>>>>> and be used by another Kafka broker. This is aligned with what >>> Colin >>>>> is >>>>>>>>>> saying (as I understand it). >>>>>>>>>> >>>>>>>>>> Disks are not the only resource on a machine, there are several >>>>>>>> instances >>>>>>>>>> where multiple NICs are used for example. Do we want fine >>> grained >>>>>>>>>> management of all these resources? I'd argue that opens us the >>> system >>>>>>>> to >>>>>>>>> a >>>>>>>>>> lot of complexity. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Eno >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> On 1 Feb 2017, at 01:53, Dong Lin <lindon...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> >>>>>>>>>>> I am going to initiate the vote If there is no further concern >>> with >>>>>>>> the >>>>>>>>>> KIP. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Dong >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Jan 27, 2017 at 8:08 PM, radai < >>> radai.rosenbl...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> a few extra points: >>>>>>>>>>>> >>>>>>>>>>>> 1. broker per disk might also incur more client <--> broker >>>>> sockets: >>>>>>>>>>>> suppose every producer / consumer "talks" to >1 partition, >>> there's >>>>> a >>>>>>>>>> very >>>>>>>>>>>> good chance that partitions that were co-located on a single >>>>> 10-disk >>>>>>>>>> broker >>>>>>>>>>>> would now be split between several single-disk broker >>> processes on >>>>>>>> the >>>>>>>>>> same >>>>>>>>>>>> machine. hard to put a multiplier on this, but likely >x1. >>> sockets >>>>>>>>> are a >>>>>>>>>>>> limited resource at the OS level and incur some memory cost >>> (kernel >>>>>>>>>>>> buffers) >>>>>>>>>>>> >>>>>>>>>>>> 2. there's a memory overhead to spinning up a JVM (compiled >>> code >>>>> and >>>>>>>>>> byte >>>>>>>>>>>> code objects etc). if we assume this overhead is ~300 MB >>> (order of >>>>>>>>>>>> magnitude, specifics vary) than spinning up 10 JVMs would >>> lose you >>>>> 3 >>>>>>>>> GB >>>>>>>>>> of >>>>>>>>>>>> RAM. not a ton, but non negligible. >>>>>>>>>>>> >>>>>>>>>>>> 3. there would also be some overhead downstream of kafka in >>> any >>>>>>>>>> management >>>>>>>>>>>> / monitoring / log aggregation system. likely less than x10 >>> though. >>>>>>>>>>>> >>>>>>>>>>>> 4. (related to above) - added complexity of administration >>> with >>>>> more >>>>>>>>>>>> running instances. >>>>>>>>>>>> >>>>>>>>>>>> is anyone running kafka with anywhere near 100GB heaps? i >>> thought >>>>>>>> the >>>>>>>>>> point >>>>>>>>>>>> was to rely on kernel page cache to do the disk buffering .... >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin < >>> lindon...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hey Colin, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks much for the comment. Please see me comment inline. >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe < >>>>> cmcc...@apache.org >>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote: >>>>>>>>>>>>>>> Hey Colin, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Good point! Yeah we have actually considered and tested >>> this >>>>>>>>>>>> solution, >>>>>>>>>>>>>>> which we call one-broker-per-disk. It would work and should >>>>>>>> require >>>>>>>>>>>> no >>>>>>>>>>>>>>> major change in Kafka as compared to this JBOD KIP. So it >>> would >>>>>>>> be >>>>>>>>> a >>>>>>>>>>>>> good >>>>>>>>>>>>>>> short term solution. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> But it has a few drawbacks which makes it less desirable >>> in the >>>>>>>>> long >>>>>>>>>>>>>>> term. >>>>>>>>>>>>>>> Assume we have 10 disks on a machine. Here are the >>> problems: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the thoughtful reply. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1) Our stress test result shows that one-broker-per-disk >>> has 15% >>>>>>>>>>>> lower >>>>>>>>>>>>>>> throughput >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2) Controller would need to send 10X as many >>>>> LeaderAndIsrRequest, >>>>>>>>>>>>>>> MetadataUpdateRequest and StopReplicaRequest. This >>> increases the >>>>>>>>>>>> burden >>>>>>>>>>>>>>> on >>>>>>>>>>>>>>> controller which can be the performance bottleneck. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Maybe I'm misunderstanding something, but there would not >>> be 10x >>>>>>>> as >>>>>>>>>>>> many >>>>>>>>>>>>>> StopReplicaRequest RPCs, would there? The other requests >>> would >>>>>>>>>>>> increase >>>>>>>>>>>>>> 10x, but from a pretty low base, right? We are not >>> reassigning >>>>>>>>>>>>>> partitions all the time, I hope (or else we have bigger >>>>>>>> problems...) >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> I think the controller will group StopReplicaRequest per >>> broker >>>>> and >>>>>>>>>> send >>>>>>>>>>>>> only one StopReplicaRequest to a broker during controlled >>>>> shutdown. >>>>>>>>>>>> Anyway, >>>>>>>>>>>>> we don't have to worry about this if we agree that other >>> requests >>>>>>>>> will >>>>>>>>>>>>> increase by 10X. One MetadataRequest to send to each broker >>> in the >>>>>>>>>>>> cluster >>>>>>>>>>>>> every time there is leadership change. I am not sure this is >>> a >>>>> real >>>>>>>>>>>>> problem. But in theory this makes the overhead complexity >>> O(number >>>>>>>> of >>>>>>>>>>>>> broker) and may be a concern in the future. Ideally we should >>>>> avoid >>>>>>>>> it. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 3) Less efficient use of physical resource on the machine. >>> The >>>>>>>>> number >>>>>>>>>>>>> of >>>>>>>>>>>>>>> socket on each machine will increase by 10X. The number of >>>>>>>>> connection >>>>>>>>>>>>>>> between any two machine will increase by 100X. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 4) Less efficient way to management memory and quota. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 5) Rebalance between disks/brokers on the same machine >>> will less >>>>>>>>>>>>>>> efficient >>>>>>>>>>>>>>> and less flexible. Broker has to read data from another >>> broker >>>>> on >>>>>>>>> the >>>>>>>>>>>>>>> same >>>>>>>>>>>>>>> machine via socket. It is also harder to do automatic load >>>>>>>> balance >>>>>>>>>>>>>>> between >>>>>>>>>>>>>>> disks on the same machine in the future. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I will put this and the explanation in the rejected >>> alternative >>>>>>>>>>>>> section. >>>>>>>>>>>>>>> I >>>>>>>>>>>>>>> have a few questions: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - Can you explain why this solution can help avoid >>> scalability >>>>>>>>>>>>>>> bottleneck? >>>>>>>>>>>>>>> I actually think it will exacerbate the scalability >>> problem due >>>>>>>> the >>>>>>>>>>>> 2) >>>>>>>>>>>>>>> above. >>>>>>>>>>>>>>> - Why can we push more RPC with this solution? >>>>>>>>>>>>>> >>>>>>>>>>>>>> To really answer this question we'd have to take a deep >>> dive into >>>>>>>>> the >>>>>>>>>>>>>> locking of the broker and figure out how effectively it can >>>>>>>>>> parallelize >>>>>>>>>>>>>> truly independent requests. Almost every multithreaded >>> process >>>>> is >>>>>>>>>>>> going >>>>>>>>>>>>>> to have shared state, like shared queues or shared sockets, >>> that >>>>>>>> is >>>>>>>>>>>>>> going to make scaling less than linear when you add disks or >>>>>>>>>>>> processors. >>>>>>>>>>>>>> (And clearly, another option is to improve that scalability, >>>>>>>> rather >>>>>>>>>>>>>> than going multi-process!) >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Yeah I also think it is better to improve scalability inside >>> kafka >>>>>>>>> code >>>>>>>>>>>> if >>>>>>>>>>>>> possible. I am not sure we currently have any scalability >>> issue >>>>>>>>> inside >>>>>>>>>>>>> Kafka that can not be removed without using multi-process. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> - It is true that a garbage collection in one broker would >>> not >>>>>>>>> affect >>>>>>>>>>>>>>> others. But that is after every broker only uses 1/10 of >>> the >>>>>>>>> memory. >>>>>>>>>>>>> Can >>>>>>>>>>>>>>> we be sure that this will actually help performance? >>>>>>>>>>>>>> >>>>>>>>>>>>>> The big question is, how much memory do Kafka brokers use >>> now, >>>>> and >>>>>>>>> how >>>>>>>>>>>>>> much will they use in the future? Our experience in HDFS >>> was >>>>> that >>>>>>>>>> once >>>>>>>>>>>>>> you start getting more than 100-200GB Java heap sizes, full >>> GCs >>>>>>>>> start >>>>>>>>>>>>>> taking minutes to finish when using the standard JVMs. That >>>>> alone >>>>>>>>> is >>>>>>>>>> a >>>>>>>>>>>>>> good reason to go multi-process or consider storing more >>> things >>>>>>>> off >>>>>>>>>> the >>>>>>>>>>>>>> Java heap. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> I see. Now I agree one-broker-per-disk should be more >>> efficient in >>>>>>>>>> terms >>>>>>>>>>>> of >>>>>>>>>>>>> GC since each broker probably needs less than 1/10 of the >>> memory >>>>>>>>>>>> available >>>>>>>>>>>>> on a typical machine nowadays. I will remove this from the >>> reason >>>>>>>> of >>>>>>>>>>>>> rejection. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Disk failure is the "easy" case. The "hard" case, which is >>>>>>>>>>>>>> unfortunately also the much more common case, is disk >>>>> misbehavior. >>>>>>>>>>>>>> Towards the end of their lives, disks tend to start slowing >>> down >>>>>>>>>>>>>> unpredictably. Requests that would have completed >>> immediately >>>>>>>>> before >>>>>>>>>>>>>> start taking 20, 100 500 milliseconds. Some files may be >>>>> readable >>>>>>>>> and >>>>>>>>>>>>>> other files may not be. System calls hang, sometimes >>> forever, >>>>> and >>>>>>>>> the >>>>>>>>>>>>>> Java process can't abort them, because the hang is in the >>> kernel. >>>>>>>>> It >>>>>>>>>>>> is >>>>>>>>>>>>>> not fun when threads are stuck in "D state" >>>>>>>>>>>>>> http://stackoverflow.com/quest >>> ions/20423521/process-perminan >>>>>>>>>>>>>> tly-stuck-on-d-state >>>>>>>>>>>>>> . Even kill -9 cannot abort the thread then. Fortunately, >>> this >>>>>>>> is >>>>>>>>>>>>>> rare. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> I agree it is a harder problem and it is rare. We probably >>> don't >>>>>>>> have >>>>>>>>>> to >>>>>>>>>>>>> worry about it in this KIP since this issue is orthogonal to >>>>>>>> whether >>>>>>>>> or >>>>>>>>>>>> not >>>>>>>>>>>>> we use JBOD. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Another approach we should consider is for Kafka to >>> implement its >>>>>>>>> own >>>>>>>>>>>>>> storage layer that would stripe across multiple disks. This >>>>>>>>> wouldn't >>>>>>>>>>>>>> have to be done at the block level, but could be done at >>> the file >>>>>>>>>>>> level. >>>>>>>>>>>>>> We could use consistent hashing to determine which disks a >>> file >>>>>>>>> should >>>>>>>>>>>>>> end up on, for example. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Are you suggesting that we should distribute log, or log >>> segment, >>>>>>>>>> across >>>>>>>>>>>>> disks of brokers? I am not sure if I fully understand this >>>>>>>> approach. >>>>>>>>> My >>>>>>>>>>>> gut >>>>>>>>>>>>> feel is that this would be a drastic solution that would >>> require >>>>>>>>>>>>> non-trivial design. While this may be useful to Kafka, I >>> would >>>>>>>> prefer >>>>>>>>>> not >>>>>>>>>>>>> to discuss this in detail in this thread unless you believe >>> it is >>>>>>>>>>>> strictly >>>>>>>>>>>>> superior to the design in this KIP in terms of solving our >>>>>>>> use-case. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> best, >>>>>>>>>>>>>> Colin >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Dong >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe < >>>>>>>> cmcc...@apache.org >>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the writeup! It's very interesting. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I apologize in advance if this has been discussed >>> somewhere >>>>>>>> else. >>>>>>>>>>>>> But >>>>>>>>>>>>>> I >>>>>>>>>>>>>>>> am curious if you have considered the solution of running >>>>>>>> multiple >>>>>>>>>>>>>>>> brokers per node. Clearly there is a memory overhead >>> with this >>>>>>>>>>>>>> solution >>>>>>>>>>>>>>>> because of the fixed cost of starting multiple JVMs. >>> However, >>>>>>>>>>>>> running >>>>>>>>>>>>>>>> multiple JVMs would help avoid scalability bottlenecks. >>> You >>>>>>>> could >>>>>>>>>>>>>>>> probably push more RPCs per second, for example. A >>> garbage >>>>>>>>>>>>> collection >>>>>>>>>>>>>>>> in one broker would not affect the others. It would be >>>>>>>>> interesting >>>>>>>>>>>>> to >>>>>>>>>>>>>>>> see this considered in the "alternate designs" design, >>> even if >>>>>>>> you >>>>>>>>>>>>> end >>>>>>>>>>>>>>>> up deciding it's not the way to go. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> best, >>>>>>>>>>>>>>>> Colin >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote: >>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> We created KIP-112: Handle disk failure for JBOD. Please >>> find >>>>>>>> the >>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>> wiki >>>>>>>>>>>>>>>>> in the link https://cwiki.apache.org/confl >>>>>>>>>>>> uence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This KIP is related to KIP-113 >>>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>> 113%3A+Support+replicas+moveme >>> nt+between+log+directories>: >>>>>>>>>>>>>>>>> Support replicas movement between log directories. They >>> are >>>>>>>>>>>> needed >>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> order >>>>>>>>>>>>>>>>> to support JBOD in Kafka. Please help review the KIP. You >>>>>>>>>>>> feedback >>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> appreciated! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Dong >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Grant Henke >>>>>>>> Software Engineer | Cloudera >>>>>>>> gr...@cloudera.com | twitter.com/gchenke | >>> linkedin.com/in/granthenke >>>>>>>> >>>>> >>>>> >>> >> >>