Hey Eno, I forgot that. Sure, that works for us.
Thanks, Dong On Thu, Feb 2, 2017 at 2:03 AM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Dong, > > The KIP meetings are traditionally held at 11am. Would that also work? So > Tuesday 7th at 11am? > > Thanks > Eno > > > On 2 Feb 2017, at 02:53, Dong Lin <lindon...@gmail.com> wrote: > > > > Hey Eno, Colin, > > > > Would you have time next Tuesday morning to discuss the KIP? How about > 10 - > > 11 am? > > > > To make best use of our time, can you please invite one or more committer > > from Confluent to join the meeting? I hope the KIP can receive one or > more > > +1 from committer at Confluent if we have no concern the KIP after the > KIP > > meeting. > > > > In the meeting time, please feel free to provide comment in the thread so > > that discussion in the KIP meeting can be more efficient. > > > > Thanks, > > Dong > > > > On Wed, Feb 1, 2017 at 5:43 PM, Dong Lin <lindon...@gmail.com> wrote: > > > >> Hey Colin, > >> > >> Thanks much for the comment. Please see my reply inline. > >> > >> On Wed, Feb 1, 2017 at 1:54 PM, Colin McCabe <cmcc...@apache.org> > wrote: > >> > >>> On Wed, Feb 1, 2017, at 11:35, Dong Lin wrote: > >>>> Hey Grant, Colin, > >>>> > >>>> My bad, I misunderstood Grant's suggestion initially. Indeed this is a > >>>> very > >>>> interesting idea to just wait for replica.max.lag.ms for the replica > on > >>>> the > >>>> bad disk to drop out of ISR instead of having broker actively > reporting > >>>> this to the controller. > >>>> > >>>> I have several concerns with this approach. > >>>> > >>>> - Broker needs to maintain persist the information of all partitions > >>> that > >>>> it has created, in a file in each disk. This is needed for broker to > >>> know > >>>> the replicas already created on the bad disks that it can not access. > If > >>>> we > >>>> don't do it, then controller sends LeaderAndIsrRequest to a broker to > >>>> become follower for a partition on the bad disk, the broker will > create > >>>> partition on a good disk. The good disks may be overloaded as > cascading > >>>> effect. > >>>> > >>>> While it is possible to let broker keep track of the replicas that it > >>> has > >>>> created, I think it is less clean than the approach in the current KIP > >>>> for > >>>> reason provided in the rejective alternative section. > >>>> > >>>> - Change is needed in the controller logic to handle failure to make a > >>>> broker as leader when controller receives LeaderAndIsrResponse. > >>>> Otherwise, > >>>> things go wrong if partition on the bad disk is requested to become > >>>> leader. > >>>> As of now, broker doesn't handle error in LeaderAndIsrResponse. > >>>> > >>>> - We still need tools and mechanism for administrator to know whether > a > >>>> replica is offline due to bad disk. I am worried that asking > >>>> administrator > >>>> to log into a machine and get this information in the log is not > >>> scalable > >>>> when the broker number is large. Although each company can develop > their > >>>> internal tools to get this information, it is a waste of developer > time > >>>> to > >>>> reinvent the wheel. Reading this information in the log also seems > less > >>>> reliable then getting it from Kafka request/response. > >>>> > >>>> I guess the goal of this alternative approach is to avoid making major > >>>> change in Kafka at the cost of increased disk failure discovery time > >>> etc. > >>>> But I think the changes required for fixing the problems above won't > be > >>>> much less. > >>> > >>> Thanks for the thoughtful replies, Dong L. > >>> > >>> Instead of having an "offline" state, how about having a "creating" > >>> state for replicas and a "created" state? Then if a replica was not > >>> accessible on any disk, but still in "created" state, the broker could > >>> know that something had gone wrong. This also would catch issues like > >>> the broker being started without all log directories configured, or > >>> disks not being correctly mounted at the expected mount points, leading > >>> to empty directories. > >>> > >> > >> Indeed, we need to have an additional state per replica to solve this > >> problem. The current KIP design addresses the problem by putting the > >> "created" state in zookeeper, as you can see in the public interface > change > >> of the KIP. Are you suggesting to solve the problem by storing this > >> information in local disk of the broker instead of zookeeper? I have two > >> concerns with this approach: > >> > >> - It requires broker to keep track of the replicas it has created. This > >> solution will split the task of determining offline replicas among > >> controller and brokers as opposed to the current Kafka design, where the > >> controller determines states of replicas and propagate this information > to > >> brokers. We think it is less error-prone to still let controller be the > >> only entity that maintains metadata (e.g. replica state) of Kafka > cluster. > >> > >> - If we store this information in local disk, then we need to have > >> additional request/response protocol in order to request broker to reset > >> this information, e.g. after a bad disk is replaced with good disk, so > that > >> the replica can be re-created on a good disk. Things would be easier if > we > >> store this information in zookeeper. > >> > >> > >>> > >>>> > >>>> To answer Colin's questions: > >>>> > >>>> - There is no action required on the side of administrator in case of > >>> log > >>>> directory failure. > >>>> > >>>> - Broker itself is going to discover log directory failure and declare > >>>> offline replicas. Broker doesn't explicitly declare log directory > >>>> failure. > >>>> But administrator can learn from the MetadataResponse that replica is > >>>> offline due to disk failure, i.e. if replica is offline but broker is > >>>> online. > >>> > >>> Can you expand on this a little bit? It sounds like you are > considering > >>> dealing with failures on a replica-by-replica basis, rather than a > >>> disk-by-disk basis. But it's disks that fail, not really individual > >>> files or directories on disks. This decision interacts poorly with the > >>> lack of a periodic scanner. It's easy to imagine a scenario where an > >>> infrequently used replica sits on a dead disk for a long time without > us > >>> declaring it dead. > >>> > >> > >> Sure. The broker will fail deal with failures on a disk-by-disk basis. > All > >> replicas on a disk to be failed if there is any disk related exception > when > >> broker accesses that disk. It means that if any replica on that broker > can > >> not be read, then all replicas on that disk are considered offline. > Since > >> controller doesn't know the disk names of replicas, it has to learn the > >> liveness of replicas on a replica-by-replica basis in order to do leader > >> election. > >> > >> Besides, I don't think we will have problem with the scenario you > >> described. If a replica is indeed not touched for a long time, then it > >> doesn't matter whether it is considered dead or not. The moment it is > >> needed, either for read or for write, the KIP makes sure that we know > its > >> state and make leader election accordingly. > >> > >> > >>>> > >>>> - This KIP does not handle cases where a few disks on a broker are > full, > >>>> but the others have space. If a disk is full and can not be written > then > >>>> the disk is considered to have failed. The imbalance across disks is > an > >>>> existing problem and will be handled in KIP-113. > >>> > >>> OK. > >>> > >>>> > >>>> - This KIP does not do a disk scanner that will periodically check for > >>>> error conditions. It doesn't handle any performance degradation of > >>> disks. > >>>> We wait for a failure to happen before declaring a disk bad. > >>>> > >>>> Yes, this KIP requires us to fix cases in the code where we are > >>>> suppressing > >>>> disk errors or ignoring their root cause. But decision of which > >>> Exception > >>>> should be considered disk failure and how to handle each of these are > >>>> more > >>>> like implementation detail. I hope we can focus on the detail and high > >>>> level idea of this KIP and only worry about specific exception when > the > >>>> patch is being reviewed. > >>> > >>> Hmm... I think we should discuss how we are going to harden the code > >>> against disk failures, and verify that it has been hardened. Or maybe > >>> we could do this in a follow-up KIP. > >>> > >> > >> By "harden the code against disk errors" do you mean that we should > make a > >> full list of disk-related exception we may see and decide if we should > >> treat each of these differently? I agree it is useful in the long term. > But > >> this is actually out of the scope of this KIP. The KIP will re-use the > >> existing KafkaStorageException without having to change what exceptions > are > >> considered KafkaStorageException. The goal is to fail replicas on a disk > >> instead of crashing the broker when KafkaStorageException is observed. > >> > >> Thanks, > >> Dong > >> > >> > >>>> After all we probably only know the list of > >>>> exceptions and ways to handle them when we start to implement the KIP. > >>>> And > >>>> we need to improve this list over time as we discover various failure > in > >>>> the deployment. > >>>> > >>>> > >>>> Hey Eno, > >>>> > >>>> Sure thing. Thanks for offering time to have a KIP meeting to discuss > >>>> this. > >>>> I will ask other Kafka developer at LinkedIn about their availability. > >>> > >>> Yeah, it would be nice to talk about this. > >> > >> > >>> regards, > >>> Colin > >>> > >>> > >>>> > >>>> Thanks, > >>>> Dong > >>>> > >>>> > >>>> On Wed, Feb 1, 2017 at 10:37 AM, Eno Thereska <eno.there...@gmail.com > > > >>>> wrote: > >>>> > >>>>> Hi Dong, > >>>>> > >>>>> Would it make sense to do a discussion over video/voice about this? I > >>>>> think it's sufficiently complex that we can probably make quicker > >>> progress > >>>>> that way? So shall we do a KIP meeting soon? I can do this week > >>> (Thu/Fri) > >>>>> or next week. > >>>>> > >>>>> Thanks > >>>>> Eno > >>>>>> On 1 Feb 2017, at 18:29, Colin McCabe <cmcc...@apache.org> wrote: > >>>>>> > >>>>>> Hmm. Maybe I misinterpreted, but I got the impression that Grant > >>> was > >>>>>> suggesting that we avoid introducing this concept of "offline > >>> replicas" > >>>>>> for now. Is that feasible? > >>>>>> > >>>>>> What is the strategy for declaring a log directory bad? Is it an > >>>>>> administrative action? Or is the broker itself going to be > >>> responsible > >>>>>> for this? How do we handle cases where a few disks on a broker are > >>>>>> full, but the others have space? > >>>>>> > >>>>>> Are we going to have a disk scanner that will periodically check for > >>>>>> error conditions (similar to the background checks that RAID > >>> controllers > >>>>>> do)? Or will we wait for a failure to happen before declaring a > >>> disk > >>>>>> bad? > >>>>>> > >>>>>> It seems to me that if we want this to work well we will need to fix > >>>>>> cases in the code where we are suppressing disk errors or ignoring > >>> their > >>>>>> root cause. For example, any place where we are using the old Java > >>> APIs > >>>>>> that just return a boolean on failure will need to be fixed, since > >>> the > >>>>>> failure could now be disk full, permission denied, or IOE, and we > >>> will > >>>>>> need to handle those cases differently. Also, we will need to > >>> harden > >>>>>> the code against disk errors. Formerly it was OK to just crash on a > >>>>>> disk error; now it is not. It would be nice to see more in the test > >>>>>> plan about injecting IOExceptions into disk handling code and > >>> verifying > >>>>>> that we can handle it correctly. > >>>>>> > >>>>>> regards, > >>>>>> Colin > >>>>>> > >>>>>> > >>>>>> On Wed, Feb 1, 2017, at 10:02, Dong Lin wrote: > >>>>>>> Hey Grant, > >>>>>>> > >>>>>>> Yes, this KIP does exactly what you described:) > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Dong > >>>>>>> > >>>>>>> On Wed, Feb 1, 2017 at 9:45 AM, Grant Henke <ghe...@cloudera.com> > >>>>> wrote: > >>>>>>> > >>>>>>>> Hi Dong, > >>>>>>>> > >>>>>>>> Thanks for putting this together. > >>>>>>>> > >>>>>>>> Since we are discussing alternative/simplified options. Have you > >>>>> considered > >>>>>>>> handling the disk failures broker side to prevent a crash, > >>> marking the > >>>>> disk > >>>>>>>> as "bad" to that individual broker, and continuing as normal? I > >>>>> imagine the > >>>>>>>> broker would then fall out of sync for the replicas hosted on the > >>> bad > >>>>> disk > >>>>>>>> and the ISR would shrink. This would allow people using min.isr > >>> to keep > >>>>>>>> their data safe and the cluster operators would see a shrink in > >>> many > >>>>> ISRs > >>>>>>>> and hopefully an obvious log message leading to a quick fix. I > >>> haven't > >>>>>>>> thought through this idea in depth though. So there could be some > >>>>>>>> shortfalls. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Grant > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Wed, Feb 1, 2017 at 11:21 AM, Dong Lin <lindon...@gmail.com> > >>> wrote: > >>>>>>>> > >>>>>>>>> Hey Eno, > >>>>>>>>> > >>>>>>>>> Thanks much for the review. > >>>>>>>>> > >>>>>>>>> I think your suggestion is to split disks of a machine into > >>> multiple > >>>>> disk > >>>>>>>>> sets and run one broker per disk set. Yeah this is similar to > >>> Colin's > >>>>>>>>> suggestion of one-broker-per-disk, which we have evaluated at > >>> LinkedIn > >>>>>>>> and > >>>>>>>>> considered it to be a good short term approach. > >>>>>>>>> > >>>>>>>>> As of now I don't think any of these approach is a better > >>> alternative > >>>>> in > >>>>>>>>> the long term. I will summarize these here. I have put these > >>> reasons > >>>>> in > >>>>>>>> the > >>>>>>>>> KIP's motivation section and rejected alternative section. I am > >>> happy > >>>>> to > >>>>>>>>> discuss more and I would certainly like to use an alternative > >>> solution > >>>>>>>> that > >>>>>>>>> is easier to do with better performance. > >>>>>>>>> > >>>>>>>>> - JBOD vs. RAID-10: if we switch from RAID-10 with > >>>>> replication-factoer=2 > >>>>>>>> to > >>>>>>>>> JBOD with replicatio-factor=3, we get 25% reduction in disk > >>> usage and > >>>>>>>>> doubles the tolerance of broker failure before data > >>> unavailability > >>>>> from 1 > >>>>>>>>> to 2. This is pretty huge gain for any company that uses Kafka at > >>>>> large > >>>>>>>>> scale. > >>>>>>>>> > >>>>>>>>> - JBOD vs. one-broker-per-disk: The benefit of > >>> one-broker-per-disk is > >>>>>>>> that > >>>>>>>>> no major code change is needed in Kafka. Among the disadvantage > >>> of > >>>>>>>>> one-broker-per-disk summarized in the KIP and previous email with > >>>>> Colin, > >>>>>>>>> the biggest one is the 15% throughput loss compared to JBOD and > >>> less > >>>>>>>>> flexibility to balance across disks. Further, it probably > >>> requires > >>>>> change > >>>>>>>>> to internal deployment tools at various companies to deal with > >>>>>>>>> one-broker-per-disk setup. > >>>>>>>>> > >>>>>>>>> - JBOD vs. RAID-0: This is the setup that used at Microsoft. The > >>>>> problem > >>>>>>>> is > >>>>>>>>> that a broker becomes unavailable if any disk fail. Suppose > >>>>>>>>> replication-factor=2 and there are 10 disks per machine. Then the > >>>>>>>>> probability of of any message becomes unavailable due to disk > >>> failure > >>>>>>>> with > >>>>>>>>> RAID-0 is 100X higher than that with JBOD. > >>>>>>>>> > >>>>>>>>> - JBOD vs. one-broker-per-few-disks: one-broker-per-few-disk is > >>>>> somewhere > >>>>>>>>> between one-broker-per-disk and RAID-0. So it carries an averaged > >>>>>>>>> disadvantages of these two approaches. > >>>>>>>>> > >>>>>>>>> To answer your question regarding, I think it is reasonable to > >>> mange > >>>>> disk > >>>>>>>>> in Kafka. By "managing disks" we mean the management of > >>> assignment of > >>>>>>>>> replicas across disks. Here are my reasons in more detail: > >>>>>>>>> > >>>>>>>>> - I don't think this KIP is a big step change. By allowing user > >>> to > >>>>>>>>> configure Kafka to run multiple log directories or disks as of > >>> now, > >>>>> it is > >>>>>>>>> implicit that Kafka manages disks. It is just not a complete > >>> feature. > >>>>>>>>> Microsoft and probably other companies are using this feature > >>> under > >>>>> the > >>>>>>>>> undesirable effect that a broker will fail any if any disk fail. > >>> It is > >>>>>>>> good > >>>>>>>>> to complete this feature. > >>>>>>>>> > >>>>>>>>> - I think it is reasonable to manage disk in Kafka. One of the > >>> most > >>>>>>>>> important work that Kafka is doing is to determine the replica > >>>>> assignment > >>>>>>>>> across brokers and make sure enough copies of a given replica is > >>>>>>>> available. > >>>>>>>>> I would argue that it is not much different than determining the > >>>>> replica > >>>>>>>>> assignment across disk conceptually. > >>>>>>>>> > >>>>>>>>> - I would agree that this KIP is improve performance of Kafka at > >>> the > >>>>> cost > >>>>>>>>> of more complexity inside Kafka, by switching from RAID-10 to > >>> JBOD. I > >>>>>>>> would > >>>>>>>>> argue that this is a right direction. If we can gain 20%+ > >>> performance > >>>>> by > >>>>>>>>> managing NIC in Kafka as compared to existing approach and other > >>>>>>>>> alternatives, I would say we should just do it. Such a gain in > >>>>>>>> performance, > >>>>>>>>> or equivalently reduction in cost, can save millions of dollars > >>> per > >>>>> year > >>>>>>>>> for any company running Kafka at large scale. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Dong > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska < > >>> eno.there...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> I'm coming somewhat late to the discussion, apologies for that. > >>>>>>>>>> > >>>>>>>>>> I'm worried about this proposal. It's moving Kafka to a world > >>> where > >>>>> it > >>>>>>>>>> manages disks. So in a sense, the scope of the KIP is limited, > >>> but > >>>>> the > >>>>>>>>>> direction it sets for Kafka is quite a big step change. > >>> Fundamentally > >>>>>>>>> this > >>>>>>>>>> is about balancing resources for a Kafka broker. This can be > >>> done by > >>>>> a > >>>>>>>>>> tool, rather than by changing Kafka. E.g., the tool would take a > >>>>> bunch > >>>>>>>> of > >>>>>>>>>> disks together, create a volume over them and export that to a > >>> Kafka > >>>>>>>>> broker > >>>>>>>>>> (in addition to setting the memory limits for that broker or > >>> limiting > >>>>>>>>> other > >>>>>>>>>> resources). A different bunch of disks can then make up a second > >>>>>>>> volume, > >>>>>>>>>> and be used by another Kafka broker. This is aligned with what > >>> Colin > >>>>> is > >>>>>>>>>> saying (as I understand it). > >>>>>>>>>> > >>>>>>>>>> Disks are not the only resource on a machine, there are several > >>>>>>>> instances > >>>>>>>>>> where multiple NICs are used for example. Do we want fine > >>> grained > >>>>>>>>>> management of all these resources? I'd argue that opens us the > >>> system > >>>>>>>> to > >>>>>>>>> a > >>>>>>>>>> lot of complexity. > >>>>>>>>>> > >>>>>>>>>> Thanks > >>>>>>>>>> Eno > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> On 1 Feb 2017, at 01:53, Dong Lin <lindon...@gmail.com> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi all, > >>>>>>>>>>> > >>>>>>>>>>> I am going to initiate the vote If there is no further concern > >>> with > >>>>>>>> the > >>>>>>>>>> KIP. > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> Dong > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Fri, Jan 27, 2017 at 8:08 PM, radai < > >>> radai.rosenbl...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> a few extra points: > >>>>>>>>>>>> > >>>>>>>>>>>> 1. broker per disk might also incur more client <--> broker > >>>>> sockets: > >>>>>>>>>>>> suppose every producer / consumer "talks" to >1 partition, > >>> there's > >>>>> a > >>>>>>>>>> very > >>>>>>>>>>>> good chance that partitions that were co-located on a single > >>>>> 10-disk > >>>>>>>>>> broker > >>>>>>>>>>>> would now be split between several single-disk broker > >>> processes on > >>>>>>>> the > >>>>>>>>>> same > >>>>>>>>>>>> machine. hard to put a multiplier on this, but likely >x1. > >>> sockets > >>>>>>>>> are a > >>>>>>>>>>>> limited resource at the OS level and incur some memory cost > >>> (kernel > >>>>>>>>>>>> buffers) > >>>>>>>>>>>> > >>>>>>>>>>>> 2. there's a memory overhead to spinning up a JVM (compiled > >>> code > >>>>> and > >>>>>>>>>> byte > >>>>>>>>>>>> code objects etc). if we assume this overhead is ~300 MB > >>> (order of > >>>>>>>>>>>> magnitude, specifics vary) than spinning up 10 JVMs would > >>> lose you > >>>>> 3 > >>>>>>>>> GB > >>>>>>>>>> of > >>>>>>>>>>>> RAM. not a ton, but non negligible. > >>>>>>>>>>>> > >>>>>>>>>>>> 3. there would also be some overhead downstream of kafka in > >>> any > >>>>>>>>>> management > >>>>>>>>>>>> / monitoring / log aggregation system. likely less than x10 > >>> though. > >>>>>>>>>>>> > >>>>>>>>>>>> 4. (related to above) - added complexity of administration > >>> with > >>>>> more > >>>>>>>>>>>> running instances. > >>>>>>>>>>>> > >>>>>>>>>>>> is anyone running kafka with anywhere near 100GB heaps? i > >>> thought > >>>>>>>> the > >>>>>>>>>> point > >>>>>>>>>>>> was to rely on kernel page cache to do the disk buffering .... > >>>>>>>>>>>> > >>>>>>>>>>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin < > >>> lindon...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hey Colin, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks much for the comment. Please see me comment inline. > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe < > >>>>> cmcc...@apache.org > >>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote: > >>>>>>>>>>>>>>> Hey Colin, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Good point! Yeah we have actually considered and tested > >>> this > >>>>>>>>>>>> solution, > >>>>>>>>>>>>>>> which we call one-broker-per-disk. It would work and should > >>>>>>>> require > >>>>>>>>>>>> no > >>>>>>>>>>>>>>> major change in Kafka as compared to this JBOD KIP. So it > >>> would > >>>>>>>> be > >>>>>>>>> a > >>>>>>>>>>>>> good > >>>>>>>>>>>>>>> short term solution. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> But it has a few drawbacks which makes it less desirable > >>> in the > >>>>>>>>> long > >>>>>>>>>>>>>>> term. > >>>>>>>>>>>>>>> Assume we have 10 disks on a machine. Here are the > >>> problems: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Dong, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the thoughtful reply. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1) Our stress test result shows that one-broker-per-disk > >>> has 15% > >>>>>>>>>>>> lower > >>>>>>>>>>>>>>> throughput > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 2) Controller would need to send 10X as many > >>>>> LeaderAndIsrRequest, > >>>>>>>>>>>>>>> MetadataUpdateRequest and StopReplicaRequest. This > >>> increases the > >>>>>>>>>>>> burden > >>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>> controller which can be the performance bottleneck. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Maybe I'm misunderstanding something, but there would not > >>> be 10x > >>>>>>>> as > >>>>>>>>>>>> many > >>>>>>>>>>>>>> StopReplicaRequest RPCs, would there? The other requests > >>> would > >>>>>>>>>>>> increase > >>>>>>>>>>>>>> 10x, but from a pretty low base, right? We are not > >>> reassigning > >>>>>>>>>>>>>> partitions all the time, I hope (or else we have bigger > >>>>>>>> problems...) > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> I think the controller will group StopReplicaRequest per > >>> broker > >>>>> and > >>>>>>>>>> send > >>>>>>>>>>>>> only one StopReplicaRequest to a broker during controlled > >>>>> shutdown. > >>>>>>>>>>>> Anyway, > >>>>>>>>>>>>> we don't have to worry about this if we agree that other > >>> requests > >>>>>>>>> will > >>>>>>>>>>>>> increase by 10X. One MetadataRequest to send to each broker > >>> in the > >>>>>>>>>>>> cluster > >>>>>>>>>>>>> every time there is leadership change. I am not sure this is > >>> a > >>>>> real > >>>>>>>>>>>>> problem. But in theory this makes the overhead complexity > >>> O(number > >>>>>>>> of > >>>>>>>>>>>>> broker) and may be a concern in the future. Ideally we should > >>>>> avoid > >>>>>>>>> it. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 3) Less efficient use of physical resource on the machine. > >>> The > >>>>>>>>> number > >>>>>>>>>>>>> of > >>>>>>>>>>>>>>> socket on each machine will increase by 10X. The number of > >>>>>>>>> connection > >>>>>>>>>>>>>>> between any two machine will increase by 100X. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 4) Less efficient way to management memory and quota. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 5) Rebalance between disks/brokers on the same machine > >>> will less > >>>>>>>>>>>>>>> efficient > >>>>>>>>>>>>>>> and less flexible. Broker has to read data from another > >>> broker > >>>>> on > >>>>>>>>> the > >>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>> machine via socket. It is also harder to do automatic load > >>>>>>>> balance > >>>>>>>>>>>>>>> between > >>>>>>>>>>>>>>> disks on the same machine in the future. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I will put this and the explanation in the rejected > >>> alternative > >>>>>>>>>>>>> section. > >>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>> have a few questions: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> - Can you explain why this solution can help avoid > >>> scalability > >>>>>>>>>>>>>>> bottleneck? > >>>>>>>>>>>>>>> I actually think it will exacerbate the scalability > >>> problem due > >>>>>>>> the > >>>>>>>>>>>> 2) > >>>>>>>>>>>>>>> above. > >>>>>>>>>>>>>>> - Why can we push more RPC with this solution? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> To really answer this question we'd have to take a deep > >>> dive into > >>>>>>>>> the > >>>>>>>>>>>>>> locking of the broker and figure out how effectively it can > >>>>>>>>>> parallelize > >>>>>>>>>>>>>> truly independent requests. Almost every multithreaded > >>> process > >>>>> is > >>>>>>>>>>>> going > >>>>>>>>>>>>>> to have shared state, like shared queues or shared sockets, > >>> that > >>>>>>>> is > >>>>>>>>>>>>>> going to make scaling less than linear when you add disks or > >>>>>>>>>>>> processors. > >>>>>>>>>>>>>> (And clearly, another option is to improve that scalability, > >>>>>>>> rather > >>>>>>>>>>>>>> than going multi-process!) > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Yeah I also think it is better to improve scalability inside > >>> kafka > >>>>>>>>> code > >>>>>>>>>>>> if > >>>>>>>>>>>>> possible. I am not sure we currently have any scalability > >>> issue > >>>>>>>>> inside > >>>>>>>>>>>>> Kafka that can not be removed without using multi-process. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> - It is true that a garbage collection in one broker would > >>> not > >>>>>>>>> affect > >>>>>>>>>>>>>>> others. But that is after every broker only uses 1/10 of > >>> the > >>>>>>>>> memory. > >>>>>>>>>>>>> Can > >>>>>>>>>>>>>>> we be sure that this will actually help performance? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The big question is, how much memory do Kafka brokers use > >>> now, > >>>>> and > >>>>>>>>> how > >>>>>>>>>>>>>> much will they use in the future? Our experience in HDFS > >>> was > >>>>> that > >>>>>>>>>> once > >>>>>>>>>>>>>> you start getting more than 100-200GB Java heap sizes, full > >>> GCs > >>>>>>>>> start > >>>>>>>>>>>>>> taking minutes to finish when using the standard JVMs. That > >>>>> alone > >>>>>>>>> is > >>>>>>>>>> a > >>>>>>>>>>>>>> good reason to go multi-process or consider storing more > >>> things > >>>>>>>> off > >>>>>>>>>> the > >>>>>>>>>>>>>> Java heap. > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> I see. Now I agree one-broker-per-disk should be more > >>> efficient in > >>>>>>>>>> terms > >>>>>>>>>>>> of > >>>>>>>>>>>>> GC since each broker probably needs less than 1/10 of the > >>> memory > >>>>>>>>>>>> available > >>>>>>>>>>>>> on a typical machine nowadays. I will remove this from the > >>> reason > >>>>>>>> of > >>>>>>>>>>>>> rejection. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Disk failure is the "easy" case. The "hard" case, which is > >>>>>>>>>>>>>> unfortunately also the much more common case, is disk > >>>>> misbehavior. > >>>>>>>>>>>>>> Towards the end of their lives, disks tend to start slowing > >>> down > >>>>>>>>>>>>>> unpredictably. Requests that would have completed > >>> immediately > >>>>>>>>> before > >>>>>>>>>>>>>> start taking 20, 100 500 milliseconds. Some files may be > >>>>> readable > >>>>>>>>> and > >>>>>>>>>>>>>> other files may not be. System calls hang, sometimes > >>> forever, > >>>>> and > >>>>>>>>> the > >>>>>>>>>>>>>> Java process can't abort them, because the hang is in the > >>> kernel. > >>>>>>>>> It > >>>>>>>>>>>> is > >>>>>>>>>>>>>> not fun when threads are stuck in "D state" > >>>>>>>>>>>>>> http://stackoverflow.com/quest > >>> ions/20423521/process-perminan > >>>>>>>>>>>>>> tly-stuck-on-d-state > >>>>>>>>>>>>>> . Even kill -9 cannot abort the thread then. Fortunately, > >>> this > >>>>>>>> is > >>>>>>>>>>>>>> rare. > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> I agree it is a harder problem and it is rare. We probably > >>> don't > >>>>>>>> have > >>>>>>>>>> to > >>>>>>>>>>>>> worry about it in this KIP since this issue is orthogonal to > >>>>>>>> whether > >>>>>>>>> or > >>>>>>>>>>>> not > >>>>>>>>>>>>> we use JBOD. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Another approach we should consider is for Kafka to > >>> implement its > >>>>>>>>> own > >>>>>>>>>>>>>> storage layer that would stripe across multiple disks. This > >>>>>>>>> wouldn't > >>>>>>>>>>>>>> have to be done at the block level, but could be done at > >>> the file > >>>>>>>>>>>> level. > >>>>>>>>>>>>>> We could use consistent hashing to determine which disks a > >>> file > >>>>>>>>> should > >>>>>>>>>>>>>> end up on, for example. > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Are you suggesting that we should distribute log, or log > >>> segment, > >>>>>>>>>> across > >>>>>>>>>>>>> disks of brokers? I am not sure if I fully understand this > >>>>>>>> approach. > >>>>>>>>> My > >>>>>>>>>>>> gut > >>>>>>>>>>>>> feel is that this would be a drastic solution that would > >>> require > >>>>>>>>>>>>> non-trivial design. While this may be useful to Kafka, I > >>> would > >>>>>>>> prefer > >>>>>>>>>> not > >>>>>>>>>>>>> to discuss this in detail in this thread unless you believe > >>> it is > >>>>>>>>>>>> strictly > >>>>>>>>>>>>> superior to the design in this KIP in terms of solving our > >>>>>>>> use-case. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>>> best, > >>>>>>>>>>>>>> Colin > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>> Dong > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe < > >>>>>>>> cmcc...@apache.org > >>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Dong, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for the writeup! It's very interesting. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I apologize in advance if this has been discussed > >>> somewhere > >>>>>>>> else. > >>>>>>>>>>>>> But > >>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>> am curious if you have considered the solution of running > >>>>>>>> multiple > >>>>>>>>>>>>>>>> brokers per node. Clearly there is a memory overhead > >>> with this > >>>>>>>>>>>>>> solution > >>>>>>>>>>>>>>>> because of the fixed cost of starting multiple JVMs. > >>> However, > >>>>>>>>>>>>> running > >>>>>>>>>>>>>>>> multiple JVMs would help avoid scalability bottlenecks. > >>> You > >>>>>>>> could > >>>>>>>>>>>>>>>> probably push more RPCs per second, for example. A > >>> garbage > >>>>>>>>>>>>> collection > >>>>>>>>>>>>>>>> in one broker would not affect the others. It would be > >>>>>>>>> interesting > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> see this considered in the "alternate designs" design, > >>> even if > >>>>>>>> you > >>>>>>>>>>>>> end > >>>>>>>>>>>>>>>> up deciding it's not the way to go. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> best, > >>>>>>>>>>>>>>>> Colin > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote: > >>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> We created KIP-112: Handle disk failure for JBOD. Please > >>> find > >>>>>>>> the > >>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>> wiki > >>>>>>>>>>>>>>>>> in the link https://cwiki.apache.org/confl > >>>>>>>>>>>> uence/display/KAFKA/KIP- > >>>>>>>>>>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> This KIP is related to KIP-113 > >>>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>>>>>> 113%3A+Support+replicas+moveme > >>> nt+between+log+directories>: > >>>>>>>>>>>>>>>>> Support replicas movement between log directories. They > >>> are > >>>>>>>>>>>> needed > >>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>> order > >>>>>>>>>>>>>>>>> to support JBOD in Kafka. Please help review the KIP. You > >>>>>>>>>>>> feedback > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> appreciated! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>> Dong > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> Grant Henke > >>>>>>>> Software Engineer | Cloudera > >>>>>>>> gr...@cloudera.com | twitter.com/gchenke | > >>> linkedin.com/in/granthenke > >>>>>>>> > >>>>> > >>>>> > >>> > >> > >> > >