Hey Jun, Thanks for all your help and time to discuss this KIP. When you get the time, could you let me know if the previous answers address the concern?
I think the more interesting question in your last email is where we should store the "created" flag in ZK. I proposed the solution that I like most, i.e. store it together with the replica assignment data in the /brokers/topics/[topic]. In order to expedite discussion, let me provide another two ideas to address the concern just in case the first idea doesn't work: - We can avoid extra controller ZK read when there is no disk failure (95% of time?). When controller starts, it doesn't read controller_managed_state in ZK and sends LeaderAndIsrRequest with "create = false". Only if LeaderAndIsrResponse shows failure for any replica, then controller will read controller_managed_state for this partition and re-send LeaderAndIsrRequset with "create=true" if this replica has not been created. - We can significantly reduce this ZK read time by making controller_managed_state a topic level information in ZK, e.g. /brokers/topics/[topic]/state. Given that most topic has 10+ partition, the extra ZK read time should be less than 10% of the existing total zk read time during controller failover. Thanks! Dong On Tue, Feb 14, 2017 at 7:30 AM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > I just realized that you may be suggesting that a tool for listing offline > directories is necessary for KIP-112 by asking whether KIP-112 and KIP-113 > will be in the same release. I think such a tool is useful but doesn't have > to be included in KIP-112. This is because as of now admin needs to log > into broker machine and check broker log to figure out the cause of broker > failure and the bad log directory in case of disk failure. The KIP-112 > won't make it harder since admin can still figure out the bad log directory > by doing the same thing. Thus it is probably OK to just include this script > in KIP-113. Regardless, my hope is to finish both KIPs ASAP and make them > in the same release since both KIPs are needed for the JBOD setup. > > Thanks, > Dong > > On Mon, Feb 13, 2017 at 5:52 PM, Dong Lin <lindon...@gmail.com> wrote: > >> And the test plan has also been updated to simulate disk failure by >> changing log directory permission to 000. >> >> On Mon, Feb 13, 2017 at 5:50 PM, Dong Lin <lindon...@gmail.com> wrote: >> >>> Hi Jun, >>> >>> Thanks for the reply. These comments are very helpful. Let me answer >>> them inline. >>> >>> >>> On Mon, Feb 13, 2017 at 3:25 PM, Jun Rao <j...@confluent.io> wrote: >>> >>>> Hi, Dong, >>>> >>>> Thanks for the reply. A few more replies and new comments below. >>>> >>>> On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin <lindon...@gmail.com> wrote: >>>> >>>> > Hi Jun, >>>> > >>>> > Thanks for the detailed comments. Please see answers inline: >>>> > >>>> > On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao <j...@confluent.io> wrote: >>>> > >>>> > > Hi, Dong, >>>> > > >>>> > > Thanks for the updated wiki. A few comments below. >>>> > > >>>> > > 1. Topics get created >>>> > > 1.1 Instead of storing successfully created replicas in ZK, could we >>>> > store >>>> > > unsuccessfully created replicas in ZK? Since the latter is less >>>> common, >>>> > it >>>> > > probably reduces the load on ZK. >>>> > > >>>> > >>>> > We can store unsuccessfully created replicas in ZK. But I am not sure >>>> if >>>> > that can reduce write load on ZK. >>>> > >>>> > If we want to reduce write load on ZK using by store unsuccessfully >>>> created >>>> > replicas in ZK, then broker should not write to ZK if all replicas are >>>> > successfully created. It means that if /broker/topics/[topic]/partiti >>>> > ons/[partitionId]/controller_managed_state doesn't exist in ZK for a >>>> given >>>> > partition, we have to assume all replicas of this partition have been >>>> > successfully created and send LeaderAndIsrRequest with create = >>>> false. This >>>> > becomes a problem if controller crashes before receiving >>>> > LeaderAndIsrResponse to validate whether a replica has been created. >>>> > >>>> > I think this approach and reduce the number of bytes stored in ZK. >>>> But I am >>>> > not sure if this is a concern. >>>> > >>>> > >>>> > >>>> I was mostly concerned about the controller failover time. Currently, >>>> the >>>> controller failover is likely dominated by the cost of reading >>>> topic/partition level information from ZK. If we add another partition >>>> level path in ZK, it probably will double the controller failover time. >>>> If >>>> the approach of representing the non-created replicas doesn't work, have >>>> you considered just adding the created flag in the leaderAndIsr path in >>>> ZK? >>>> >>>> >>> Yes, I have considered adding the created flag in the leaderAndIsr path >>> in ZK. If we were to add created flag per replica in the >>> LeaderAndIsrRequest, then it requires a lot of change in the code base. >>> >>> If we don't add created flag per replica in the LeaderAndIsrRequest, >>> then the information in leaderAndIsr path in ZK and LeaderAndIsrRequest >>> would be different. Further, the procedure for broker to update ISR in ZK >>> will be a bit complicated. When leader updates leaderAndIsr path in ZK, it >>> will have to first read created flags from ZK, change isr, and write >>> leaderAndIsr back to ZK. And it needs to check znode version and re-try >>> write operation in ZK if controller has updated ZK during this period. This >>> is in contrast to the current implementation where the leader either gets >>> all the information from LeaderAndIsrRequest sent by controller, or >>> determine the infromation by itself (e.g. ISR), before writing to >>> leaderAndIsr path in ZK. >>> >>> It seems to me that the above solution is a bit complicated and not >>> clean. Thus I come up with the design in this KIP to store this created >>> flag in a separate zk path. The path is named controller_managed_state to >>> indicate that we can store in this znode all information that is managed by >>> controller only, as opposed to ISR. >>> >>> I agree with your concern of increased ZK read time during controller >>> failover. How about we store the "created" information in the >>> znode /brokers/topics/[topic]? We can change that znode to have the >>> following data format: >>> >>> { >>> "version" : 2, >>> "created" : { >>> "1" : [1, 2, 3], >>> ... >>> } >>> "partition" : { >>> "1" : [1, 2, 3], >>> ... >>> } >>> } >>> >>> We won't have extra zk read using this solution. It also seems >>> reasonable to put the partition assignment information together with >>> replica creation information. The latter is only changed once after the >>> partition is created or re-assigned. >>> >>> >>>> >>>> >>>> > >>>> > > 1.2 If an error is received for a follower, does the controller >>>> eagerly >>>> > > remove it from ISR or do we just let the leader removes it after >>>> timeout? >>>> > > >>>> > >>>> > No, Controller will not actively remove it from ISR. But controller >>>> will >>>> > recognize it as offline replica and propagate this information to all >>>> > brokers via UpdateMetadataRequest. Each leader can use this >>>> information to >>>> > actively remove offline replica from ISR set. I have updated to wiki >>>> to >>>> > clarify it. >>>> > >>>> > >>>> >>>> That seems inconsistent with how the controller deals with offline >>>> replicas >>>> due to broker failures. When that happens, the broker will (1) select a >>>> new >>>> leader if the offline replica is the leader; (2) remove the replica from >>>> ISR if the offline replica is the follower. So, intuitively, it seems >>>> that >>>> we should be doing the same thing when dealing with offline replicas >>>> due to >>>> disk failure. >>>> >>> >>> My bad. I misunderstand how the controller currently handles broker >>> failure and ISR change. Yes we should do the same thing when dealing with >>> offline replicas here. I have updated the KIP to specify that, when an >>> offline replica is discovered by controller, the controller removes offline >>> replicas from ISR in the ZK and sends LeaderAndIsrRequest with updated ISR >>> to be used by partition leaders. >>> >>> >>>> >>>> >>>> >>>> > >>>> > > 1.3 Similar, if an error is received for a leader, should the >>>> controller >>>> > > trigger leader election again? >>>> > > >>>> > >>>> > Yes, controller will trigger leader election if leader replica is >>>> offline. >>>> > I have updated the wiki to clarify it. >>>> > >>>> > >>>> > > >>>> > > 2. A log directory stops working on a broker during runtime: >>>> > > 2.1 It seems the broker remembers the failed directory after >>>> hitting an >>>> > > IOException and the failed directory won't be used for creating new >>>> > > partitions until the broker is restarted? If so, could you add that >>>> to >>>> > the >>>> > > wiki. >>>> > > >>>> > >>>> > Right, broker assumes a log directory to be good after it starts, and >>>> mark >>>> > log directory as bad once there is IOException when broker attempts to >>>> > access the log directory. New replicas will only be created on good >>>> log >>>> > directory. I just added this to the KIP. >>>> > >>>> > >>>> > > 2.2 Could you be a bit more specific on how and during which >>>> operation >>>> > the >>>> > > broker detects directory failure? Is it when the broker hits an >>>> > IOException >>>> > > during writes, or both reads and writes? For example, during broker >>>> > > startup, it only reads from each of the log directories, if it hits >>>> an >>>> > > IOException there, does the broker immediately mark the directory as >>>> > > offline? >>>> > > >>>> > >>>> > Broker marks log directory as bad once there is IOException when >>>> broker >>>> > attempts to access the log directory. This includes read and write. >>>> These >>>> > operations include log append, log read, log cleaning, watermark >>>> checkpoint >>>> > etc. If broker hits IOException when it reads from each of the log >>>> > directory during startup, it immediately mark the directory as >>>> offline. >>>> > >>>> > I just updated the KIP to clarify it. >>>> > >>>> > >>>> > > 3. Partition reassignment: If we know a replica is offline, do we >>>> still >>>> > > want to send StopReplicaRequest to it? >>>> > > >>>> > >>>> > No, controller doesn't send StopReplicaRequest for an offline replica. >>>> > Controller treats this scenario in the same way that exiting Kafka >>>> > implementation does when the broker of this replica is offline. >>>> > >>>> > >>>> > > >>>> > > 4. UpdateMetadataRequestPartitionState: For offline_replicas, do >>>> they >>>> > only >>>> > > include offline replicas due to log directory failures or do they >>>> also >>>> > > include offline replicas due to broker failure? >>>> > > >>>> > >>>> > UpdateMetadataRequestPartitionState's offline_replicas include >>>> offline >>>> > replicas due to both log directory failure and broker failure. This >>>> is to >>>> > make the semantics of this field easier to understand. Broker can >>>> > distinguish whether a replica is offline due to broker failure or disk >>>> > failure by checking whether a broker is live in the >>>> UpdateMetadataRequest. >>>> > >>>> > >>>> > > >>>> > > 5. Tools: Could we add some kind of support in the tool to list >>>> offline >>>> > > directories? >>>> > > >>>> > >>>> > In KIP-112 we don't have tools to list offline directories because we >>>> have >>>> > intentionally avoided exposing log directory information (e.g. log >>>> > directory path) to user or other brokers. I think we can add this >>>> feature >>>> > in KIP-113, in which we will have DescribeDirsRequest to list log >>>> directory >>>> > information (e.g. partition assignment, path, size) needed for >>>> rebalance. >>>> > >>>> > >>>> Since we are introducing a new failure mode, if a replica becomes >>>> offline >>>> due to failure in log directories, the first thing an admin wants to >>>> know >>>> is which log directories are offline from the broker's perspective. So, >>>> including such a tool will be useful. Do you plan to do KIP-112 and >>>> KIP-113 >>>> in the same release? >>>> >>>> >>> Yes, I agree that including such a tool is using. This is probably >>> better to be added in KIP-113 because we need DescribeDirsRequest to get >>> this information. I will update KIP-113 to include this tool. >>> >>> I plan to do KIP-112 and KIP-113 separately to make each KIP and their >>> patch easier to review. I don't have any plan about which release to have >>> these KIPs. My plan is to both of them ASAP. Is there particular timeline >>> you prefer for code of these two KIPs to checked-in? >>> >>> >>>> > >>>> > > >>>> > > 6. Metrics: Could we add some metrics to show offline directories? >>>> > > >>>> > >>>> > Sure. I think it makes sense to have each broker report its number of >>>> > offline replicas and offline log directories. The previous metric was >>>> put >>>> > in KIP-113. I just added both metrics in KIP-112. >>>> > >>>> > >>>> > > >>>> > > 7. There are still references to kafka-log-dirs.sh. Are they valid? >>>> > > >>>> > >>>> > My bad. I just removed this from "Changes in Operational Procedures" >>>> and >>>> > "Test Plan" in the KIP. >>>> > >>>> > >>>> > > >>>> > > 8. Do you think KIP-113 is ready for review? One thing that KIP-113 >>>> > > mentions during partition reassignment is to first send >>>> > > LeaderAndIsrRequest, followed by ChangeReplicaDirRequest. It seems >>>> it's >>>> > > better if the replicas are created in the right log directory in the >>>> > first >>>> > > place? The reason that I brought it up here is because it may >>>> affect the >>>> > > protocol of LeaderAndIsrRequest. >>>> > > >>>> > >>>> > Yes, KIP-113 is ready for review. The advantage of the current design >>>> is >>>> > that we can keep LeaderAndIsrRequest log-direcotry-agnostic. The >>>> > implementation would be much easier to read if all log related logic >>>> (e.g. >>>> > various errors) are put in ChangeReplicadIRrequest and the code path >>>> of >>>> > handling replica movement is separated from leadership handling. >>>> > >>>> > In other words, I think Kafka may be easier to develop in the long >>>> term if >>>> > we separate these two requests. >>>> > >>>> > I agree that ideally we want to create replicas in the right log >>>> directory >>>> > in the first place. But I am not sure if there is any performance or >>>> > correctness concern with the existing way of moving it after it is >>>> created. >>>> > Besides, does this decision affect the change proposed in KIP-112? >>>> > >>>> > >>>> I am just wondering if you have considered including the log directory >>>> for >>>> the replicas in the LeaderAndIsrRequest. >>>> >>>> >>> Yeah I have thought about this idea, but only briefly. I rejected this >>> idea because log directory is broker's local information and I prefer not >>> to expose local config information to the cluster through >>> LeaderAndIsrRequest. >>> >>> >>>> 9. Could you describe when the offline replicas due to log directory >>>> failure are removed from the replica fetch threads? >>>> >>> >>> Yes. If the offline replica was a leader, either a new leader is elected >>> or all follower brokers will stop fetching for this partition. If the >>> offline replica is a follower, the broker will stop fetching for this >>> replica immediately. A broker stops fetching data for a replica by removing >>> the replica from the replica fetch threads. I have updated the KIP to >>> clarify it. >>> >>> >>>> >>>> 10. The wiki mentioned changing the log directory to a file for >>>> simulating >>>> disk failure in system tests. Could we just change the permission of the >>>> log directory to 000 to simulate that? >>>> >>> >>> >>> Sure, >>> >>> >>>> >>>> Thanks, >>>> >>>> Jun >>>> >>>> >>>> > > Jun >>>> > > >>>> > > On Fri, Feb 10, 2017 at 9:53 AM, Dong Lin <lindon...@gmail.com> >>>> wrote: >>>> > > >>>> > > > Hi Jun, >>>> > > > >>>> > > > Can I replace zookeeper access with direct RPC for both ISR >>>> > notification >>>> > > > and disk failure notification in a future KIP, or do you feel we >>>> should >>>> > > do >>>> > > > it in this KIP? >>>> > > > >>>> > > > Hi Eno, Grant and everyone, >>>> > > > >>>> > > > Is there further improvement you would like to see with this KIP? >>>> > > > >>>> > > > Thanks you all for the comments, >>>> > > > >>>> > > > Dong >>>> > > > >>>> > > > >>>> > > > >>>> > > > On Thu, Feb 9, 2017 at 4:45 PM, Dong Lin <lindon...@gmail.com> >>>> wrote: >>>> > > > >>>> > > > > >>>> > > > > >>>> > > > > On Thu, Feb 9, 2017 at 3:37 PM, Colin McCabe < >>>> cmcc...@apache.org> >>>> > > wrote: >>>> > > > > >>>> > > > >> On Thu, Feb 9, 2017, at 11:40, Dong Lin wrote: >>>> > > > >> > Thanks for all the comments Colin! >>>> > > > >> > >>>> > > > >> > To answer your questions: >>>> > > > >> > - Yes, a broker will shutdown if all its log directories are >>>> bad. >>>> > > > >> >>>> > > > >> That makes sense. Can you add this to the writeup? >>>> > > > >> >>>> > > > > >>>> > > > > Sure. This has already been added. You can find it here >>>> > > > > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio >>>> n.action >>>> > ? >>>> > > > pageId=67638402&selectedPageVersions=9&selectedPageVersions=10> >>>> > > > > . >>>> > > > > >>>> > > > > >>>> > > > >> >>>> > > > >> > - I updated the KIP to explicitly state that a log directory >>>> will >>>> > be >>>> > > > >> > assumed to be good until broker sees IOException when it >>>> tries to >>>> > > > access >>>> > > > >> > the log directory. >>>> > > > >> >>>> > > > >> Thanks. >>>> > > > >> >>>> > > > >> > - Controller doesn't explicitly know whether there is new log >>>> > > > directory >>>> > > > >> > or >>>> > > > >> > not. All controller knows is whether replicas are online or >>>> > offline >>>> > > > >> based >>>> > > > >> > on LeaderAndIsrResponse. According to the existing Kafka >>>> > > > implementation, >>>> > > > >> > controller will always send LeaderAndIsrRequest to a broker >>>> after >>>> > it >>>> > > > >> > bounces. >>>> > > > >> >>>> > > > >> I thought so. It's good to clarify, though. Do you think it's >>>> > worth >>>> > > > >> adding a quick discussion of this on the wiki? >>>> > > > >> >>>> > > > > >>>> > > > > Personally I don't think it is needed. If broker starts with no >>>> bad >>>> > log >>>> > > > > directory, everything should work it is and we should not need >>>> to >>>> > > clarify >>>> > > > > it. The KIP has already covered the scenario when a broker >>>> starts >>>> > with >>>> > > > bad >>>> > > > > log directory. Also, the KIP doesn't claim or hint that we >>>> support >>>> > > > dynamic >>>> > > > > addition of new log directories. I think we are good. >>>> > > > > >>>> > > > > >>>> > > > >> best, >>>> > > > >> Colin >>>> > > > >> >>>> > > > >> > >>>> > > > >> > Please see this >>>> > > > >> > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio >>>> > > > >> n.action?pageId=67638402&selectedPageVersions=9& >>>> > > > selectedPageVersions=10> >>>> > > > >> > for the change of the KIP. >>>> > > > >> > >>>> > > > >> > On Thu, Feb 9, 2017 at 11:04 AM, Colin McCabe < >>>> cmcc...@apache.org >>>> > > >>>> > > > >> wrote: >>>> > > > >> > >>>> > > > >> > > On Thu, Feb 9, 2017, at 11:03, Colin McCabe wrote: >>>> > > > >> > > > Thanks, Dong L. >>>> > > > >> > > > >>>> > > > >> > > > Do we plan on bringing down the broker process when all >>>> log >>>> > > > >> directories >>>> > > > >> > > > are offline? >>>> > > > >> > > > >>>> > > > >> > > > Can you explicitly state on the KIP that the log dirs >>>> are all >>>> > > > >> considered >>>> > > > >> > > > good after the broker process is bounced? It seems like >>>> an >>>> > > > >> important >>>> > > > >> > > > thing to be clear about. Also, perhaps discuss how the >>>> > > controller >>>> > > > >> > > > becomes aware of the newly good log directories after a >>>> broker >>>> > > > >> bounce >>>> > > > >> > > > (and whether this triggers re-election). >>>> > > > >> > > >>>> > > > >> > > I meant to write, all the log dirs where the broker can >>>> still >>>> > read >>>> > > > the >>>> > > > >> > > index and some other files. Clearly, log dirs that are >>>> > completely >>>> > > > >> > > inaccessible will still be considered bad after a broker >>>> process >>>> > > > >> bounce. >>>> > > > >> > > >>>> > > > >> > > best, >>>> > > > >> > > Colin >>>> > > > >> > > >>>> > > > >> > > > >>>> > > > >> > > > +1 (non-binding) aside from that >>>> > > > >> > > > >>>> > > > >> > > > >>>> > > > >> > > > >>>> > > > >> > > > On Wed, Feb 8, 2017, at 00:47, Dong Lin wrote: >>>> > > > >> > > > > Hi all, >>>> > > > >> > > > > >>>> > > > >> > > > > Thank you all for the helpful suggestion. I have >>>> updated the >>>> > > KIP >>>> > > > >> to >>>> > > > >> > > > > address >>>> > > > >> > > > > the comments received so far. See here >>>> > > > >> > > > > <https://cwiki.apache.org/conf >>>> > luence/pages/diffpagesbyversio >>>> > > > >> n.action? >>>> > > > >> > > pageId=67638402&selectedPageVe >>>> rsions=8&selectedPageVersions= >>>> > 9>to >>>> > > > >> > > > > read the changes of the KIP. Here is a summary of >>>> change: >>>> > > > >> > > > > >>>> > > > >> > > > > - Updated the Proposed Change section to change the >>>> recovery >>>> > > > >> steps. >>>> > > > >> > > After >>>> > > > >> > > > > this change, broker will also create replica as long >>>> as all >>>> > > log >>>> > > > >> > > > > directories >>>> > > > >> > > > > are working. >>>> > > > >> > > > > - Removed kafka-log-dirs.sh from this KIP since user no >>>> > longer >>>> > > > >> needs to >>>> > > > >> > > > > use >>>> > > > >> > > > > it for recovery from bad disks. >>>> > > > >> > > > > - Explained how the znode controller_managed_state is >>>> > managed >>>> > > in >>>> > > > >> the >>>> > > > >> > > > > Public >>>> > > > >> > > > > interface section. >>>> > > > >> > > > > - Explained what happens during controller failover, >>>> > partition >>>> > > > >> > > > > reassignment >>>> > > > >> > > > > and topic deletion in the Proposed Change section. >>>> > > > >> > > > > - Updated Future Work section to include the following >>>> > > potential >>>> > > > >> > > > > improvements >>>> > > > >> > > > > - Let broker notify controller of ISR change and disk >>>> > state >>>> > > > >> change >>>> > > > >> > > via >>>> > > > >> > > > > RPC instead of using zookeeper >>>> > > > >> > > > > - Handle various failure scenarios (e.g. slow disk) >>>> on a >>>> > > > >> case-by-case >>>> > > > >> > > > > basis. For example, we may want to detect slow disk and >>>> > > consider >>>> > > > >> it as >>>> > > > >> > > > > offline. >>>> > > > >> > > > > - Allow admin to mark a directory as bad so that it >>>> will >>>> > not >>>> > > > be >>>> > > > >> used. >>>> > > > >> > > > > >>>> > > > >> > > > > Thanks, >>>> > > > >> > > > > Dong >>>> > > > >> > > > > >>>> > > > >> > > > > >>>> > > > >> > > > > >>>> > > > >> > > > > On Tue, Feb 7, 2017 at 5:23 PM, Dong Lin < >>>> > lindon...@gmail.com >>>> > > > >>>> > > > >> wrote: >>>> > > > >> > > > > >>>> > > > >> > > > > > Hey Eno, >>>> > > > >> > > > > > >>>> > > > >> > > > > > Thanks much for the comment! >>>> > > > >> > > > > > >>>> > > > >> > > > > > I still think the complexity added to Kafka is >>>> justified >>>> > by >>>> > > > its >>>> > > > >> > > benefit. >>>> > > > >> > > > > > Let me provide my reasons below. >>>> > > > >> > > > > > >>>> > > > >> > > > > > 1) The additional logic is easy to understand and >>>> thus its >>>> > > > >> complexity >>>> > > > >> > > > > > should be reasonable. >>>> > > > >> > > > > > >>>> > > > >> > > > > > On the broker side, it needs to catch exception when >>>> > access >>>> > > > log >>>> > > > >> > > directory, >>>> > > > >> > > > > > mark log directory and all its replicas as offline, >>>> notify >>>> > > > >> > > controller by >>>> > > > >> > > > > > writing the zookeeper notification path, and specify >>>> error >>>> > > in >>>> > > > >> > > > > > LeaderAndIsrResponse. On the controller side, it will >>>> > > listener >>>> > > > >> to >>>> > > > >> > > > > > zookeeper for disk failure notification, learn about >>>> > offline >>>> > > > >> > > replicas in >>>> > > > >> > > > > > the LeaderAndIsrResponse, and take offline replicas >>>> into >>>> > > > >> > > consideration when >>>> > > > >> > > > > > electing leaders. It also mark replica as created in >>>> > > zookeeper >>>> > > > >> and >>>> > > > >> > > use it >>>> > > > >> > > > > > to determine whether a replica is created. >>>> > > > >> > > > > > >>>> > > > >> > > > > > That is all the logic we need to add in Kafka. I >>>> > personally >>>> > > > feel >>>> > > > >> > > this is >>>> > > > >> > > > > > easy to reason about. >>>> > > > >> > > > > > >>>> > > > >> > > > > > 2) The additional code is not much. >>>> > > > >> > > > > > >>>> > > > >> > > > > > I expect the code for KIP-112 to be around 1100 >>>> lines new >>>> > > > code. >>>> > > > >> > > Previously >>>> > > > >> > > > > > I have implemented a prototype of a slightly >>>> different >>>> > > design >>>> > > > >> (see >>>> > > > >> > > here >>>> > > > >> > > > > > <https://docs.google.com/docum >>>> ent/d/1Izza0SBmZMVUBUt9s_ >>>> > > > >> > > -Dqi3D8e0KGJQYW8xgEdRsgAI/edit>) >>>> > > > >> > > > > > and uploaded it to github (see here >>>> > > > >> > > > > > <https://github.com/lindong28/kafka/tree/JBOD>). The >>>> > patch >>>> > > > >> changed >>>> > > > >> > > 33 >>>> > > > >> > > > > > files, added 1185 lines and deleted 183 lines. The >>>> size of >>>> > > > >> prototype >>>> > > > >> > > patch >>>> > > > >> > > > > > is actually smaller than patch of KIP-107 (see here >>>> > > > >> > > > > > <https://github.com/apache/kafka/pull/2476>) which >>>> is >>>> > > already >>>> > > > >> > > accepted. >>>> > > > >> > > > > > The KIP-107 patch changed 49 files, added 1349 lines >>>> and >>>> > > > >> deleted 141 >>>> > > > >> > > lines. >>>> > > > >> > > > > > >>>> > > > >> > > > > > 3) Comparison with one-broker-per-multiple-volumes >>>> > > > >> > > > > > >>>> > > > >> > > > > > This KIP can improve the availability of Kafka in >>>> this >>>> > case >>>> > > > such >>>> > > > >> > > that one >>>> > > > >> > > > > > failed volume doesn't bring down the entire broker. >>>> > > > >> > > > > > >>>> > > > >> > > > > > 4) Comparison with one-broker-per-volume >>>> > > > >> > > > > > >>>> > > > >> > > > > > If each volume maps to multiple disks, then we still >>>> have >>>> > > > >> similar >>>> > > > >> > > problem >>>> > > > >> > > > > > such that the broker will fail if any disk of the >>>> volume >>>> > > > failed. >>>> > > > >> > > > > > >>>> > > > >> > > > > > If each volume maps to one disk, it means that we >>>> need to >>>> > > > >> deploy 10 >>>> > > > >> > > > > > brokers on a machine if the machine has 10 disks. I >>>> will >>>> > > > >> explain the >>>> > > > >> > > > > > concern with this approach in order of their >>>> importance. >>>> > > > >> > > > > > >>>> > > > >> > > > > > - It is weird if we were to tell kafka user to >>>> deploy 50 >>>> > > > >> brokers on a >>>> > > > >> > > > > > machine of 50 disks. >>>> > > > >> > > > > > >>>> > > > >> > > > > > - Either when user deploys Kafka on a commercial >>>> cloud >>>> > > > platform >>>> > > > >> or >>>> > > > >> > > when >>>> > > > >> > > > > > user deploys their own cluster, the size or largest >>>> disk >>>> > is >>>> > > > >> usually >>>> > > > >> > > > > > limited. There will be scenarios where user want to >>>> > increase >>>> > > > >> broker >>>> > > > >> > > > > > capacity by having multiple disks per broker. This >>>> JBOD >>>> > KIP >>>> > > > >> makes it >>>> > > > >> > > > > > feasible without hurting availability due to single >>>> disk >>>> > > > >> failure. >>>> > > > >> > > > > > >>>> > > > >> > > > > > - Automatic load rebalance across disks will be >>>> easier and >>>> > > > more >>>> > > > >> > > flexible >>>> > > > >> > > > > > if one broker has multiple disks. This can be future >>>> work. >>>> > > > >> > > > > > >>>> > > > >> > > > > > - There is performance concern when you deploy 10 >>>> broker >>>> > > vs. 1 >>>> > > > >> > > broker on >>>> > > > >> > > > > > one machine. The metadata the cluster, including >>>> > > FetchRequest, >>>> > > > >> > > > > > ProduceResponse, MetadataRequest and so on will all >>>> be 10X >>>> > > > >> more. The >>>> > > > >> > > > > > packet-per-second will be 10X higher which may limit >>>> > > > >> performance if >>>> > > > >> > > pps is >>>> > > > >> > > > > > the performance bottleneck. The number of socket on >>>> the >>>> > > > machine >>>> > > > >> is >>>> > > > >> > > 10X >>>> > > > >> > > > > > higher. And the number of replication thread will be >>>> 100X >>>> > > > more. >>>> > > > >> The >>>> > > > >> > > impact >>>> > > > >> > > > > > will be more significant with increasing number of >>>> disks >>>> > per >>>> > > > >> > > machine. Thus >>>> > > > >> > > > > > it will limit Kakfa's scalability in the long term. >>>> > > > >> > > > > > >>>> > > > >> > > > > > Thanks, >>>> > > > >> > > > > > Dong >>>> > > > >> > > > > > >>>> > > > >> > > > > > >>>> > > > >> > > > > > On Tue, Feb 7, 2017 at 1:51 AM, Eno Thereska < >>>> > > > >> eno.there...@gmail.com >>>> > > > >> > > > >>>> > > > >> > > > > > wrote: >>>> > > > >> > > > > > >>>> > > > >> > > > > >> Hi Dong, >>>> > > > >> > > > > >> >>>> > > > >> > > > > >> To simplify the discussion today, on my part I'll >>>> zoom >>>> > into >>>> > > > one >>>> > > > >> > > thing >>>> > > > >> > > > > >> only: >>>> > > > >> > > > > >> >>>> > > > >> > > > > >> - I'll discuss the options called below : >>>> > > > >> "one-broker-per-disk" or >>>> > > > >> > > > > >> "one-broker-per-few-disks". >>>> > > > >> > > > > >> >>>> > > > >> > > > > >> - I completely buy the JBOD vs RAID arguments so >>>> there is >>>> > > no >>>> > > > >> need to >>>> > > > >> > > > > >> discuss that part for me. I buy it that JBODs are >>>> good. >>>> > > > >> > > > > >> >>>> > > > >> > > > > >> I find the terminology can be improved a bit. >>>> Ideally >>>> > we'd >>>> > > be >>>> > > > >> > > talking >>>> > > > >> > > > > >> about volumes, not disks. Just to make it clear that >>>> > Kafka >>>> > > > >> > > understand >>>> > > > >> > > > > >> volumes/directories, not individual raw disks. So by >>>> > > > >> > > > > >> "one-broker-per-few-disks" what I mean is that the >>>> admin >>>> > > can >>>> > > > >> pool a >>>> > > > >> > > few >>>> > > > >> > > > > >> disks together to create a volume/directory and >>>> give that >>>> > > to >>>> > > > >> Kafka. >>>> > > > >> > > > > >> >>>> > > > >> > > > > >> >>>> > > > >> > > > > >> The kernel of my question will be that the admin >>>> already >>>> > > has >>>> > > > >> tools >>>> > > > >> > > to 1) >>>> > > > >> > > > > >> create volumes/directories from a JBOD and 2) start >>>> a >>>> > > broker >>>> > > > >> on a >>>> > > > >> > > desired >>>> > > > >> > > > > >> machine and 3) assign a broker resources like a >>>> > directory. >>>> > > I >>>> > > > >> claim >>>> > > > >> > > that >>>> > > > >> > > > > >> those tools are sufficient to optimise resource >>>> > allocation. >>>> > > > I >>>> > > > >> > > understand >>>> > > > >> > > > > >> that a broker could manage point 3) itself, ie >>>> juggle the >>>> > > > >> > > directories. My >>>> > > > >> > > > > >> question is whether the complexity added to Kafka is >>>> > > > justified. >>>> > > > >> > > > > >> Operationally it seems to me an admin will still >>>> have to >>>> > do >>>> > > > >> all the >>>> > > > >> > > three >>>> > > > >> > > > > >> items above. >>>> > > > >> > > > > >> >>>> > > > >> > > > > >> Looking forward to the discussion >>>> > > > >> > > > > >> Thanks >>>> > > > >> > > > > >> Eno >>>> > > > >> > > > > >> >>>> > > > >> > > > > >> >>>> > > > >> > > > > >> > On 1 Feb 2017, at 17:21, Dong Lin < >>>> lindon...@gmail.com >>>> > > >>>> > > > >> wrote: >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > Hey Eno, >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > Thanks much for the review. >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > I think your suggestion is to split disks of a >>>> machine >>>> > > into >>>> > > > >> > > multiple >>>> > > > >> > > > > >> disk >>>> > > > >> > > > > >> > sets and run one broker per disk set. Yeah this is >>>> > > similar >>>> > > > to >>>> > > > >> > > Colin's >>>> > > > >> > > > > >> > suggestion of one-broker-per-disk, which we have >>>> > > evaluated >>>> > > > at >>>> > > > >> > > LinkedIn >>>> > > > >> > > > > >> and >>>> > > > >> > > > > >> > considered it to be a good short term approach. >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > As of now I don't think any of these approach is a >>>> > better >>>> > > > >> > > alternative in >>>> > > > >> > > > > >> > the long term. I will summarize these here. I >>>> have put >>>> > > > these >>>> > > > >> > > reasons in >>>> > > > >> > > > > >> the >>>> > > > >> > > > > >> > KIP's motivation section and rejected alternative >>>> > > section. >>>> > > > I >>>> > > > >> am >>>> > > > >> > > happy to >>>> > > > >> > > > > >> > discuss more and I would certainly like to use an >>>> > > > alternative >>>> > > > >> > > solution >>>> > > > >> > > > > >> that >>>> > > > >> > > > > >> > is easier to do with better performance. >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > - JBOD vs. RAID-10: if we switch from RAID-10 with >>>> > > > >> > > > > >> replication-factoer=2 to >>>> > > > >> > > > > >> > JBOD with replicatio-factor=3, we get 25% >>>> reduction in >>>> > > disk >>>> > > > >> usage >>>> > > > >> > > and >>>> > > > >> > > > > >> > doubles the tolerance of broker failure before >>>> data >>>> > > > >> > > unavailability from >>>> > > > >> > > > > >> 1 >>>> > > > >> > > > > >> > to 2. This is pretty huge gain for any company >>>> that >>>> > uses >>>> > > > >> Kafka at >>>> > > > >> > > large >>>> > > > >> > > > > >> > scale. >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > - JBOD vs. one-broker-per-disk: The benefit of >>>> > > > >> > > one-broker-per-disk is >>>> > > > >> > > > > >> that >>>> > > > >> > > > > >> > no major code change is needed in Kafka. Among the >>>> > > > >> disadvantage of >>>> > > > >> > > > > >> > one-broker-per-disk summarized in the KIP and >>>> previous >>>> > > > email >>>> > > > >> with >>>> > > > >> > > Colin, >>>> > > > >> > > > > >> > the biggest one is the 15% throughput loss >>>> compared to >>>> > > JBOD >>>> > > > >> and >>>> > > > >> > > less >>>> > > > >> > > > > >> > flexibility to balance across disks. Further, it >>>> > probably >>>> > > > >> requires >>>> > > > >> > > > > >> change >>>> > > > >> > > > > >> > to internal deployment tools at various companies >>>> to >>>> > deal >>>> > > > >> with >>>> > > > >> > > > > >> > one-broker-per-disk setup. >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > - JBOD vs. RAID-0: This is the setup that used at >>>> > > > Microsoft. >>>> > > > >> The >>>> > > > >> > > > > >> problem is >>>> > > > >> > > > > >> > that a broker becomes unavailable if any disk >>>> fail. >>>> > > Suppose >>>> > > > >> > > > > >> > replication-factor=2 and there are 10 disks per >>>> > machine. >>>> > > > >> Then the >>>> > > > >> > > > > >> > probability of of any message becomes unavailable >>>> due >>>> > to >>>> > > > disk >>>> > > > >> > > failure >>>> > > > >> > > > > >> with >>>> > > > >> > > > > >> > RAID-0 is 100X higher than that with JBOD. >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > - JBOD vs. one-broker-per-few-disks: >>>> > > > one-broker-per-few-disk >>>> > > > >> is >>>> > > > >> > > > > >> somewhere >>>> > > > >> > > > > >> > between one-broker-per-disk and RAID-0. So it >>>> carries >>>> > an >>>> > > > >> averaged >>>> > > > >> > > > > >> > disadvantages of these two approaches. >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > To answer your question regarding, I think it is >>>> > > reasonable >>>> > > > >> to >>>> > > > >> > > mange >>>> > > > >> > > > > >> disk >>>> > > > >> > > > > >> > in Kafka. By "managing disks" we mean the >>>> management of >>>> > > > >> > > assignment of >>>> > > > >> > > > > >> > replicas across disks. Here are my reasons in more >>>> > > detail: >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > - I don't think this KIP is a big step change. By >>>> > > allowing >>>> > > > >> user to >>>> > > > >> > > > > >> > configure Kafka to run multiple log directories or >>>> > disks >>>> > > as >>>> > > > >> of >>>> > > > >> > > now, it >>>> > > > >> > > > > >> is >>>> > > > >> > > > > >> > implicit that Kafka manages disks. It is just not >>>> a >>>> > > > complete >>>> > > > >> > > feature. >>>> > > > >> > > > > >> > Microsoft and probably other companies are using >>>> this >>>> > > > feature >>>> > > > >> > > under the >>>> > > > >> > > > > >> > undesirable effect that a broker will fail any if >>>> any >>>> > > disk >>>> > > > >> fail. >>>> > > > >> > > It is >>>> > > > >> > > > > >> good >>>> > > > >> > > > > >> > to complete this feature. >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > - I think it is reasonable to manage disk in >>>> Kafka. One >>>> > > of >>>> > > > >> the >>>> > > > >> > > most >>>> > > > >> > > > > >> > important work that Kafka is doing is to >>>> determine the >>>> > > > >> replica >>>> > > > >> > > > > >> assignment >>>> > > > >> > > > > >> > across brokers and make sure enough copies of a >>>> given >>>> > > > >> replica is >>>> > > > >> > > > > >> available. >>>> > > > >> > > > > >> > I would argue that it is not much different than >>>> > > > determining >>>> > > > >> the >>>> > > > >> > > replica >>>> > > > >> > > > > >> > assignment across disk conceptually. >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > - I would agree that this KIP is improve >>>> performance of >>>> > > > >> Kafka at >>>> > > > >> > > the >>>> > > > >> > > > > >> cost >>>> > > > >> > > > > >> > of more complexity inside Kafka, by switching from >>>> > > RAID-10 >>>> > > > to >>>> > > > >> > > JBOD. I >>>> > > > >> > > > > >> would >>>> > > > >> > > > > >> > argue that this is a right direction. If we can >>>> gain >>>> > 20%+ >>>> > > > >> > > performance by >>>> > > > >> > > > > >> > managing NIC in Kafka as compared to existing >>>> approach >>>> > > and >>>> > > > >> other >>>> > > > >> > > > > >> > alternatives, I would say we should just do it. >>>> Such a >>>> > > gain >>>> > > > >> in >>>> > > > >> > > > > >> performance, >>>> > > > >> > > > > >> > or equivalently reduction in cost, can save >>>> millions of >>>> > > > >> dollars >>>> > > > >> > > per year >>>> > > > >> > > > > >> > for any company running Kafka at large scale. >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > Thanks, >>>> > > > >> > > > > >> > Dong >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> > On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska < >>>> > > > >> > > eno.there...@gmail.com> >>>> > > > >> > > > > >> wrote: >>>> > > > >> > > > > >> > >>>> > > > >> > > > > >> >> I'm coming somewhat late to the discussion, >>>> apologies >>>> > > for >>>> > > > >> that. >>>> > > > >> > > > > >> >> >>>> > > > >> > > > > >> >> I'm worried about this proposal. It's moving >>>> Kafka to >>>> > a >>>> > > > >> world >>>> > > > >> > > where it >>>> > > > >> > > > > >> >> manages disks. So in a sense, the scope of the >>>> KIP is >>>> > > > >> limited, >>>> > > > >> > > but the >>>> > > > >> > > > > >> >> direction it sets for Kafka is quite a big step >>>> > change. >>>> > > > >> > > Fundamentally >>>> > > > >> > > > > >> this >>>> > > > >> > > > > >> >> is about balancing resources for a Kafka broker. >>>> This >>>> > > can >>>> > > > be >>>> > > > >> > > done by a >>>> > > > >> > > > > >> >> tool, rather than by changing Kafka. E.g., the >>>> tool >>>> > > would >>>> > > > >> take a >>>> > > > >> > > bunch >>>> > > > >> > > > > >> of >>>> > > > >> > > > > >> >> disks together, create a volume over them and >>>> export >>>> > > that >>>> > > > >> to a >>>> > > > >> > > Kafka >>>> > > > >> > > > > >> broker >>>> > > > >> > > > > >> >> (in addition to setting the memory limits for >>>> that >>>> > > broker >>>> > > > or >>>> > > > >> > > limiting >>>> > > > >> > > > > >> other >>>> > > > >> > > > > >> >> resources). A different bunch of disks can then >>>> make >>>> > up >>>> > > a >>>> > > > >> second >>>> > > > >> > > > > >> volume, >>>> > > > >> > > > > >> >> and be used by another Kafka broker. This is >>>> aligned >>>> > > with >>>> > > > >> what >>>> > > > >> > > Colin is >>>> > > > >> > > > > >> >> saying (as I understand it). >>>> > > > >> > > > > >> >> >>>> > > > >> > > > > >> >> Disks are not the only resource on a machine, >>>> there >>>> > are >>>> > > > >> several >>>> > > > >> > > > > >> instances >>>> > > > >> > > > > >> >> where multiple NICs are used for example. Do we >>>> want >>>> > > fine >>>> > > > >> grained >>>> > > > >> > > > > >> >> management of all these resources? I'd argue that >>>> > opens >>>> > > us >>>> > > > >> the >>>> > > > >> > > system >>>> > > > >> > > > > >> to a >>>> > > > >> > > > > >> >> lot of complexity. >>>> > > > >> > > > > >> >> >>>> > > > >> > > > > >> >> Thanks >>>> > > > >> > > > > >> >> Eno >>>> > > > >> > > > > >> >> >>>> > > > >> > > > > >> >> >>>> > > > >> > > > > >> >>> On 1 Feb 2017, at 01:53, Dong Lin < >>>> > lindon...@gmail.com >>>> > > > >>>> > > > >> wrote: >>>> > > > >> > > > > >> >>> >>>> > > > >> > > > > >> >>> Hi all, >>>> > > > >> > > > > >> >>> >>>> > > > >> > > > > >> >>> I am going to initiate the vote If there is no >>>> > further >>>> > > > >> concern >>>> > > > >> > > with >>>> > > > >> > > > > >> the >>>> > > > >> > > > > >> >> KIP. >>>> > > > >> > > > > >> >>> >>>> > > > >> > > > > >> >>> Thanks, >>>> > > > >> > > > > >> >>> Dong >>>> > > > >> > > > > >> >>> >>>> > > > >> > > > > >> >>> >>>> > > > >> > > > > >> >>> On Fri, Jan 27, 2017 at 8:08 PM, radai < >>>> > > > >> > > radai.rosenbl...@gmail.com> >>>> > > > >> > > > > >> >> wrote: >>>> > > > >> > > > > >> >>> >>>> > > > >> > > > > >> >>>> a few extra points: >>>> > > > >> > > > > >> >>>> >>>> > > > >> > > > > >> >>>> 1. broker per disk might also incur more >>>> client <--> >>>> > > > >> broker >>>> > > > >> > > sockets: >>>> > > > >> > > > > >> >>>> suppose every producer / consumer "talks" to >1 >>>> > > > partition, >>>> > > > >> > > there's a >>>> > > > >> > > > > >> >> very >>>> > > > >> > > > > >> >>>> good chance that partitions that were >>>> co-located on >>>> > a >>>> > > > >> single >>>> > > > >> > > 10-disk >>>> > > > >> > > > > >> >> broker >>>> > > > >> > > > > >> >>>> would now be split between several single-disk >>>> > broker >>>> > > > >> > > processes on >>>> > > > >> > > > > >> the >>>> > > > >> > > > > >> >> same >>>> > > > >> > > > > >> >>>> machine. hard to put a multiplier on this, but >>>> > likely >>>> > > > >x1. >>>> > > > >> > > sockets >>>> > > > >> > > > > >> are a >>>> > > > >> > > > > >> >>>> limited resource at the OS level and incur some >>>> > memory >>>> > > > >> cost >>>> > > > >> > > (kernel >>>> > > > >> > > > > >> >>>> buffers) >>>> > > > >> > > > > >> >>>> >>>> > > > >> > > > > >> >>>> 2. there's a memory overhead to spinning up a >>>> JVM >>>> > > > >> (compiled >>>> > > > >> > > code and >>>> > > > >> > > > > >> >> byte >>>> > > > >> > > > > >> >>>> code objects etc). if we assume this overhead >>>> is >>>> > ~300 >>>> > > MB >>>> > > > >> > > (order of >>>> > > > >> > > > > >> >>>> magnitude, specifics vary) than spinning up 10 >>>> JVMs >>>> > > > would >>>> > > > >> lose >>>> > > > >> > > you 3 >>>> > > > >> > > > > >> GB >>>> > > > >> > > > > >> >> of >>>> > > > >> > > > > >> >>>> RAM. not a ton, but non negligible. >>>> > > > >> > > > > >> >>>> >>>> > > > >> > > > > >> >>>> 3. there would also be some overhead >>>> downstream of >>>> > > kafka >>>> > > > >> in any >>>> > > > >> > > > > >> >> management >>>> > > > >> > > > > >> >>>> / monitoring / log aggregation system. likely >>>> less >>>> > > than >>>> > > > >> x10 >>>> > > > >> > > though. >>>> > > > >> > > > > >> >>>> >>>> > > > >> > > > > >> >>>> 4. (related to above) - added complexity of >>>> > > > administration >>>> > > > >> > > with more >>>> > > > >> > > > > >> >>>> running instances. >>>> > > > >> > > > > >> >>>> >>>> > > > >> > > > > >> >>>> is anyone running kafka with anywhere near >>>> 100GB >>>> > > heaps? >>>> > > > i >>>> > > > >> > > thought the >>>> > > > >> > > > > >> >> point >>>> > > > >> > > > > >> >>>> was to rely on kernel page cache to do the disk >>>> > > > buffering >>>> > > > >> .... >>>> > > > >> > > > > >> >>>> >>>> > > > >> > > > > >> >>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin < >>>> > > > >> > > lindon...@gmail.com> >>>> > > > >> > > > > >> wrote: >>>> > > > >> > > > > >> >>>> >>>> > > > >> > > > > >> >>>>> Hey Colin, >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>> Thanks much for the comment. Please see me >>>> comment >>>> > > > >> inline. >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin >>>> McCabe < >>>> > > > >> > > cmcc...@apache.org> >>>> > > > >> > > > > >> >>>> wrote: >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin >>>> wrote: >>>> > > > >> > > > > >> >>>>>>> Hey Colin, >>>> > > > >> > > > > >> >>>>>>> >>>> > > > >> > > > > >> >>>>>>> Good point! Yeah we have actually >>>> considered and >>>> > > > >> tested this >>>> > > > >> > > > > >> >>>> solution, >>>> > > > >> > > > > >> >>>>>>> which we call one-broker-per-disk. It would >>>> work >>>> > > and >>>> > > > >> should >>>> > > > >> > > > > >> require >>>> > > > >> > > > > >> >>>> no >>>> > > > >> > > > > >> >>>>>>> major change in Kafka as compared to this >>>> JBOD >>>> > KIP. >>>> > > > So >>>> > > > >> it >>>> > > > >> > > would >>>> > > > >> > > > > >> be a >>>> > > > >> > > > > >> >>>>> good >>>> > > > >> > > > > >> >>>>>>> short term solution. >>>> > > > >> > > > > >> >>>>>>> >>>> > > > >> > > > > >> >>>>>>> But it has a few drawbacks which makes it >>>> less >>>> > > > >> desirable in >>>> > > > >> > > the >>>> > > > >> > > > > >> long >>>> > > > >> > > > > >> >>>>>>> term. >>>> > > > >> > > > > >> >>>>>>> Assume we have 10 disks on a machine. Here >>>> are >>>> > the >>>> > > > >> problems: >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>>> Hi Dong, >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>>> Thanks for the thoughtful reply. >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>>>> >>>> > > > >> > > > > >> >>>>>>> 1) Our stress test result shows that >>>> > > > >> one-broker-per-disk >>>> > > > >> > > has 15% >>>> > > > >> > > > > >> >>>> lower >>>> > > > >> > > > > >> >>>>>>> throughput >>>> > > > >> > > > > >> >>>>>>> >>>> > > > >> > > > > >> >>>>>>> 2) Controller would need to send 10X as many >>>> > > > >> > > LeaderAndIsrRequest, >>>> > > > >> > > > > >> >>>>>>> MetadataUpdateRequest and >>>> StopReplicaRequest. >>>> > This >>>> > > > >> > > increases the >>>> > > > >> > > > > >> >>>> burden >>>> > > > >> > > > > >> >>>>>>> on >>>> > > > >> > > > > >> >>>>>>> controller which can be the performance >>>> > bottleneck. >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>>> Maybe I'm misunderstanding something, but >>>> there >>>> > > would >>>> > > > >> not be >>>> > > > >> > > 10x as >>>> > > > >> > > > > >> >>>> many >>>> > > > >> > > > > >> >>>>>> StopReplicaRequest RPCs, would there? The >>>> other >>>> > > > >> requests >>>> > > > >> > > would >>>> > > > >> > > > > >> >>>> increase >>>> > > > >> > > > > >> >>>>>> 10x, but from a pretty low base, right? We >>>> are >>>> > not >>>> > > > >> > > reassigning >>>> > > > >> > > > > >> >>>>>> partitions all the time, I hope (or else we >>>> have >>>> > > > bigger >>>> > > > >> > > > > >> problems...) >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>> I think the controller will group >>>> > StopReplicaRequest >>>> > > > per >>>> > > > >> > > broker and >>>> > > > >> > > > > >> >> send >>>> > > > >> > > > > >> >>>>> only one StopReplicaRequest to a broker during >>>> > > > controlled >>>> > > > >> > > shutdown. >>>> > > > >> > > > > >> >>>> Anyway, >>>> > > > >> > > > > >> >>>>> we don't have to worry about this if we agree >>>> that >>>> > > > other >>>> > > > >> > > requests >>>> > > > >> > > > > >> will >>>> > > > >> > > > > >> >>>>> increase by 10X. One MetadataRequest to send >>>> to >>>> > each >>>> > > > >> broker >>>> > > > >> > > in the >>>> > > > >> > > > > >> >>>> cluster >>>> > > > >> > > > > >> >>>>> every time there is leadership change. I am >>>> not >>>> > sure >>>> > > > >> this is >>>> > > > >> > > a real >>>> > > > >> > > > > >> >>>>> problem. But in theory this makes the overhead >>>> > > > complexity >>>> > > > >> > > O(number >>>> > > > >> > > > > >> of >>>> > > > >> > > > > >> >>>>> broker) and may be a concern in the future. >>>> Ideally >>>> > > we >>>> > > > >> should >>>> > > > >> > > avoid >>>> > > > >> > > > > >> it. >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>>>> >>>> > > > >> > > > > >> >>>>>>> 3) Less efficient use of physical resource >>>> on the >>>> > > > >> machine. >>>> > > > >> > > The >>>> > > > >> > > > > >> number >>>> > > > >> > > > > >> >>>>> of >>>> > > > >> > > > > >> >>>>>>> socket on each machine will increase by >>>> 10X. The >>>> > > > >> number of >>>> > > > >> > > > > >> connection >>>> > > > >> > > > > >> >>>>>>> between any two machine will increase by >>>> 100X. >>>> > > > >> > > > > >> >>>>>>> >>>> > > > >> > > > > >> >>>>>>> 4) Less efficient way to management memory >>>> and >>>> > > quota. >>>> > > > >> > > > > >> >>>>>>> >>>> > > > >> > > > > >> >>>>>>> 5) Rebalance between disks/brokers on the >>>> same >>>> > > > machine >>>> > > > >> will >>>> > > > >> > > less >>>> > > > >> > > > > >> >>>>>>> efficient >>>> > > > >> > > > > >> >>>>>>> and less flexible. Broker has to read data >>>> from >>>> > > > another >>>> > > > >> > > broker on >>>> > > > >> > > > > >> the >>>> > > > >> > > > > >> >>>>>>> same >>>> > > > >> > > > > >> >>>>>>> machine via socket. It is also harder to do >>>> > > automatic >>>> > > > >> load >>>> > > > >> > > balance >>>> > > > >> > > > > >> >>>>>>> between >>>> > > > >> > > > > >> >>>>>>> disks on the same machine in the future. >>>> > > > >> > > > > >> >>>>>>> >>>> > > > >> > > > > >> >>>>>>> I will put this and the explanation in the >>>> > rejected >>>> > > > >> > > alternative >>>> > > > >> > > > > >> >>>>> section. >>>> > > > >> > > > > >> >>>>>>> I >>>> > > > >> > > > > >> >>>>>>> have a few questions: >>>> > > > >> > > > > >> >>>>>>> >>>> > > > >> > > > > >> >>>>>>> - Can you explain why this solution can help >>>> > avoid >>>> > > > >> > > scalability >>>> > > > >> > > > > >> >>>>>>> bottleneck? >>>> > > > >> > > > > >> >>>>>>> I actually think it will exacerbate the >>>> > scalability >>>> > > > >> problem >>>> > > > >> > > due >>>> > > > >> > > > > >> the >>>> > > > >> > > > > >> >>>> 2) >>>> > > > >> > > > > >> >>>>>>> above. >>>> > > > >> > > > > >> >>>>>>> - Why can we push more RPC with this >>>> solution? >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>>> To really answer this question we'd have to >>>> take a >>>> > > > deep >>>> > > > >> dive >>>> > > > >> > > into >>>> > > > >> > > > > >> the >>>> > > > >> > > > > >> >>>>>> locking of the broker and figure out how >>>> > effectively >>>> > > > it >>>> > > > >> can >>>> > > > >> > > > > >> >> parallelize >>>> > > > >> > > > > >> >>>>>> truly independent requests. Almost every >>>> > > > multithreaded >>>> > > > >> > > process is >>>> > > > >> > > > > >> >>>> going >>>> > > > >> > > > > >> >>>>>> to have shared state, like shared queues or >>>> shared >>>> > > > >> sockets, >>>> > > > >> > > that is >>>> > > > >> > > > > >> >>>>>> going to make scaling less than linear when >>>> you >>>> > add >>>> > > > >> disks or >>>> > > > >> > > > > >> >>>> processors. >>>> > > > >> > > > > >> >>>>>> (And clearly, another option is to improve >>>> that >>>> > > > >> scalability, >>>> > > > >> > > rather >>>> > > > >> > > > > >> >>>>>> than going multi-process!) >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>> Yeah I also think it is better to improve >>>> > scalability >>>> > > > >> inside >>>> > > > >> > > kafka >>>> > > > >> > > > > >> code >>>> > > > >> > > > > >> >>>> if >>>> > > > >> > > > > >> >>>>> possible. I am not sure we currently have any >>>> > > > scalability >>>> > > > >> > > issue >>>> > > > >> > > > > >> inside >>>> > > > >> > > > > >> >>>>> Kafka that can not be removed without using >>>> > > > >> multi-process. >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>>>> - It is true that a garbage collection in >>>> one >>>> > > broker >>>> > > > >> would >>>> > > > >> > > not >>>> > > > >> > > > > >> affect >>>> > > > >> > > > > >> >>>>>>> others. But that is after every broker only >>>> uses >>>> > > 1/10 >>>> > > > >> of the >>>> > > > >> > > > > >> memory. >>>> > > > >> > > > > >> >>>>> Can >>>> > > > >> > > > > >> >>>>>>> we be sure that this will actually help >>>> > > performance? >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>>> The big question is, how much memory do Kafka >>>> > > brokers >>>> > > > >> use >>>> > > > >> > > now, and >>>> > > > >> > > > > >> how >>>> > > > >> > > > > >> >>>>>> much will they use in the future? Our >>>> experience >>>> > in >>>> > > > >> HDFS >>>> > > > >> > > was that >>>> > > > >> > > > > >> >> once >>>> > > > >> > > > > >> >>>>>> you start getting more than 100-200GB Java >>>> heap >>>> > > sizes, >>>> > > > >> full >>>> > > > >> > > GCs >>>> > > > >> > > > > >> start >>>> > > > >> > > > > >> >>>>>> taking minutes to finish when using the >>>> standard >>>> > > JVMs. >>>> > > > >> That >>>> > > > >> > > alone >>>> > > > >> > > > > >> is >>>> > > > >> > > > > >> >> a >>>> > > > >> > > > > >> >>>>>> good reason to go multi-process or consider >>>> > storing >>>> > > > more >>>> > > > >> > > things off >>>> > > > >> > > > > >> >> the >>>> > > > >> > > > > >> >>>>>> Java heap. >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>> I see. Now I agree one-broker-per-disk should >>>> be >>>> > more >>>> > > > >> > > efficient in >>>> > > > >> > > > > >> >> terms >>>> > > > >> > > > > >> >>>> of >>>> > > > >> > > > > >> >>>>> GC since each broker probably needs less than >>>> 1/10 >>>> > of >>>> > > > the >>>> > > > >> > > memory >>>> > > > >> > > > > >> >>>> available >>>> > > > >> > > > > >> >>>>> on a typical machine nowadays. I will remove >>>> this >>>> > > from >>>> > > > >> the >>>> > > > >> > > reason of >>>> > > > >> > > > > >> >>>>> rejection. >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>>> Disk failure is the "easy" case. The "hard" >>>> case, >>>> > > > >> which is >>>> > > > >> > > > > >> >>>>>> unfortunately also the much more common >>>> case, is >>>> > > disk >>>> > > > >> > > misbehavior. >>>> > > > >> > > > > >> >>>>>> Towards the end of their lives, disks tend to >>>> > start >>>> > > > >> slowing >>>> > > > >> > > down >>>> > > > >> > > > > >> >>>>>> unpredictably. Requests that would have >>>> completed >>>> > > > >> > > immediately >>>> > > > >> > > > > >> before >>>> > > > >> > > > > >> >>>>>> start taking 20, 100 500 milliseconds. Some >>>> files >>>> > > may >>>> > > > >> be >>>> > > > >> > > readable >>>> > > > >> > > > > >> and >>>> > > > >> > > > > >> >>>>>> other files may not be. System calls hang, >>>> > > sometimes >>>> > > > >> > > forever, and >>>> > > > >> > > > > >> the >>>> > > > >> > > > > >> >>>>>> Java process can't abort them, because the >>>> hang is >>>> > > in >>>> > > > >> the >>>> > > > >> > > kernel. >>>> > > > >> > > > > >> It >>>> > > > >> > > > > >> >>>> is >>>> > > > >> > > > > >> >>>>>> not fun when threads are stuck in "D state" >>>> > > > >> > > > > >> >>>>>> http://stackoverflow.com/quest >>>> > > > >> ions/20423521/process-perminan >>>> > > > >> > > > > >> >>>>>> tly-stuck-on-d-state >>>> > > > >> > > > > >> >>>>>> . Even kill -9 cannot abort the thread then. >>>> > > > >> Fortunately, >>>> > > > >> > > this is >>>> > > > >> > > > > >> >>>>>> rare. >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>> I agree it is a harder problem and it is >>>> rare. We >>>> > > > >> probably >>>> > > > >> > > don't >>>> > > > >> > > > > >> have >>>> > > > >> > > > > >> >> to >>>> > > > >> > > > > >> >>>>> worry about it in this KIP since this issue is >>>> > > > >> orthogonal to >>>> > > > >> > > > > >> whether or >>>> > > > >> > > > > >> >>>> not >>>> > > > >> > > > > >> >>>>> we use JBOD. >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>>> Another approach we should consider is for >>>> Kafka >>>> > to >>>> > > > >> > > implement its >>>> > > > >> > > > > >> own >>>> > > > >> > > > > >> >>>>>> storage layer that would stripe across >>>> multiple >>>> > > disks. >>>> > > > >> This >>>> > > > >> > > > > >> wouldn't >>>> > > > >> > > > > >> >>>>>> have to be done at the block level, but >>>> could be >>>> > > done >>>> > > > >> at the >>>> > > > >> > > file >>>> > > > >> > > > > >> >>>> level. >>>> > > > >> > > > > >> >>>>>> We could use consistent hashing to determine >>>> which >>>> > > > >> disks a >>>> > > > >> > > file >>>> > > > >> > > > > >> should >>>> > > > >> > > > > >> >>>>>> end up on, for example. >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>> Are you suggesting that we should distribute >>>> log, >>>> > or >>>> > > > log >>>> > > > >> > > segment, >>>> > > > >> > > > > >> >> across >>>> > > > >> > > > > >> >>>>> disks of brokers? I am not sure if I fully >>>> > understand >>>> > > > >> this >>>> > > > >> > > > > >> approach. My >>>> > > > >> > > > > >> >>>> gut >>>> > > > >> > > > > >> >>>>> feel is that this would be a drastic solution >>>> that >>>> > > > would >>>> > > > >> > > require >>>> > > > >> > > > > >> >>>>> non-trivial design. While this may be useful >>>> to >>>> > > Kafka, >>>> > > > I >>>> > > > >> would >>>> > > > >> > > > > >> prefer >>>> > > > >> > > > > >> >> not >>>> > > > >> > > > > >> >>>>> to discuss this in detail in this thread >>>> unless you >>>> > > > >> believe >>>> > > > >> > > it is >>>> > > > >> > > > > >> >>>> strictly >>>> > > > >> > > > > >> >>>>> superior to the design in this KIP in terms of >>>> > > solving >>>> > > > >> our >>>> > > > >> > > use-case. >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>>>> best, >>>> > > > >> > > > > >> >>>>>> Colin >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>>>> >>>> > > > >> > > > > >> >>>>>>> Thanks, >>>> > > > >> > > > > >> >>>>>>> Dong >>>> > > > >> > > > > >> >>>>>>> >>>> > > > >> > > > > >> >>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin >>>> McCabe < >>>> > > > >> > > > > >> cmcc...@apache.org> >>>> > > > >> > > > > >> >>>>>>> wrote: >>>> > > > >> > > > > >> >>>>>>> >>>> > > > >> > > > > >> >>>>>>>> Hi Dong, >>>> > > > >> > > > > >> >>>>>>>> >>>> > > > >> > > > > >> >>>>>>>> Thanks for the writeup! It's very >>>> interesting. >>>> > > > >> > > > > >> >>>>>>>> >>>> > > > >> > > > > >> >>>>>>>> I apologize in advance if this has been >>>> > discussed >>>> > > > >> > > somewhere else. >>>> > > > >> > > > > >> >>>>> But >>>> > > > >> > > > > >> >>>>>> I >>>> > > > >> > > > > >> >>>>>>>> am curious if you have considered the >>>> solution >>>> > of >>>> > > > >> running >>>> > > > >> > > > > >> multiple >>>> > > > >> > > > > >> >>>>>>>> brokers per node. Clearly there is a >>>> memory >>>> > > > overhead >>>> > > > >> with >>>> > > > >> > > this >>>> > > > >> > > > > >> >>>>>> solution >>>> > > > >> > > > > >> >>>>>>>> because of the fixed cost of starting >>>> multiple >>>> > > JVMs. >>>> > > > >> > > However, >>>> > > > >> > > > > >> >>>>> running >>>> > > > >> > > > > >> >>>>>>>> multiple JVMs would help avoid scalability >>>> > > > >> bottlenecks. >>>> > > > >> > > You >>>> > > > >> > > > > >> could >>>> > > > >> > > > > >> >>>>>>>> probably push more RPCs per second, for >>>> example. >>>> > > A >>>> > > > >> garbage >>>> > > > >> > > > > >> >>>>> collection >>>> > > > >> > > > > >> >>>>>>>> in one broker would not affect the >>>> others. It >>>> > > would >>>> > > > >> be >>>> > > > >> > > > > >> interesting >>>> > > > >> > > > > >> >>>>> to >>>> > > > >> > > > > >> >>>>>>>> see this considered in the "alternate >>>> designs" >>>> > > > design, >>>> > > > >> > > even if >>>> > > > >> > > > > >> you >>>> > > > >> > > > > >> >>>>> end >>>> > > > >> > > > > >> >>>>>>>> up deciding it's not the way to go. >>>> > > > >> > > > > >> >>>>>>>> >>>> > > > >> > > > > >> >>>>>>>> best, >>>> > > > >> > > > > >> >>>>>>>> Colin >>>> > > > >> > > > > >> >>>>>>>> >>>> > > > >> > > > > >> >>>>>>>> >>>> > > > >> > > > > >> >>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin >>>> wrote: >>>> > > > >> > > > > >> >>>>>>>>> Hi all, >>>> > > > >> > > > > >> >>>>>>>>> >>>> > > > >> > > > > >> >>>>>>>>> We created KIP-112: Handle disk failure >>>> for >>>> > JBOD. >>>> > > > >> Please >>>> > > > >> > > find >>>> > > > >> > > > > >> the >>>> > > > >> > > > > >> >>>>> KIP >>>> > > > >> > > > > >> >>>>>>>>> wiki >>>> > > > >> > > > > >> >>>>>>>>> in the link >>>> https://cwiki.apache.org/confl >>>> > > > >> > > > > >> >>>> uence/display/KAFKA/KIP- >>>> > > > >> > > > > >> >>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD. >>>> > > > >> > > > > >> >>>>>>>>> >>>> > > > >> > > > > >> >>>>>>>>> This KIP is related to KIP-113 >>>> > > > >> > > > > >> >>>>>>>>> <https://cwiki.apache.org/conf >>>> > > > >> luence/display/KAFKA/KIP- >>>> > > > >> > > > > >> >>>>>>>> 113%3A+Support+replicas+moveme >>>> > > > >> nt+between+log+directories>: >>>> > > > >> > > > > >> >>>>>>>>> Support replicas movement between log >>>> > > directories. >>>> > > > >> They >>>> > > > >> > > are >>>> > > > >> > > > > >> >>>> needed >>>> > > > >> > > > > >> >>>>> in >>>> > > > >> > > > > >> >>>>>>>>> order >>>> > > > >> > > > > >> >>>>>>>>> to support JBOD in Kafka. Please help >>>> review >>>> > the >>>> > > > >> KIP. You >>>> > > > >> > > > > >> >>>> feedback >>>> > > > >> > > > > >> >>>>> is >>>> > > > >> > > > > >> >>>>>>>>> appreciated! >>>> > > > >> > > > > >> >>>>>>>>> >>>> > > > >> > > > > >> >>>>>>>>> Thanks, >>>> > > > >> > > > > >> >>>>>>>>> Dong >>>> > > > >> > > > > >> >>>>>>>> >>>> > > > >> > > > > >> >>>>>> >>>> > > > >> > > > > >> >>>>> >>>> > > > >> > > > > >> >>>> >>>> > > > >> > > > > >> >> >>>> > > > >> > > > > >> >> >>>> > > > >> > > > > >> >>>> > > > >> > > > > >> >>>> > > > >> > > > > > >>>> > > > >> > > >>>> > > > >> >>>> > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> >>> >>> >> >