Hey Jun, Could you please let me know if the solutions above could address your concern? I really want to move the discussion forward.
Thanks, Dong On Tue, Feb 14, 2017 at 8:17 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > Thanks for all your help and time to discuss this KIP. When you get the > time, could you let me know if the previous answers address the concern? > > I think the more interesting question in your last email is where we > should store the "created" flag in ZK. I proposed the solution that I like > most, i.e. store it together with the replica assignment data in the > /brokers/topics/[topic]. > In order to expedite discussion, let me provide another two ideas to > address the concern just in case the first idea doesn't work: > > - We can avoid extra controller ZK read when there is no disk failure > (95% of time?). When controller starts, it doesn't > read controller_managed_state in ZK and sends LeaderAndIsrRequest with > "create = false". Only if LeaderAndIsrResponse shows failure for any > replica, then controller will read controller_managed_state for this > partition and re-send LeaderAndIsrRequset with "create=true" if this > replica has not been created. > > - We can significantly reduce this ZK read time by making > controller_managed_state a topic level information in ZK, e.g. > /brokers/topics/[topic]/state. Given that most topic has 10+ partition, > the extra ZK read time should be less than 10% of the existing total zk > read time during controller failover. > > Thanks! > Dong > > > On Tue, Feb 14, 2017 at 7:30 AM, Dong Lin <lindon...@gmail.com> wrote: > >> Hey Jun, >> >> I just realized that you may be suggesting that a tool for listing >> offline directories is necessary for KIP-112 by asking whether KIP-112 and >> KIP-113 will be in the same release. I think such a tool is useful but >> doesn't have to be included in KIP-112. This is because as of now admin >> needs to log into broker machine and check broker log to figure out the >> cause of broker failure and the bad log directory in case of disk failure. >> The KIP-112 won't make it harder since admin can still figure out the bad >> log directory by doing the same thing. Thus it is probably OK to just >> include this script in KIP-113. Regardless, my hope is to finish both KIPs >> ASAP and make them in the same release since both KIPs are needed for the >> JBOD setup. >> >> Thanks, >> Dong >> >> On Mon, Feb 13, 2017 at 5:52 PM, Dong Lin <lindon...@gmail.com> wrote: >> >>> And the test plan has also been updated to simulate disk failure by >>> changing log directory permission to 000. >>> >>> On Mon, Feb 13, 2017 at 5:50 PM, Dong Lin <lindon...@gmail.com> wrote: >>> >>>> Hi Jun, >>>> >>>> Thanks for the reply. These comments are very helpful. Let me answer >>>> them inline. >>>> >>>> >>>> On Mon, Feb 13, 2017 at 3:25 PM, Jun Rao <j...@confluent.io> wrote: >>>> >>>>> Hi, Dong, >>>>> >>>>> Thanks for the reply. A few more replies and new comments below. >>>>> >>>>> On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin <lindon...@gmail.com> wrote: >>>>> >>>>> > Hi Jun, >>>>> > >>>>> > Thanks for the detailed comments. Please see answers inline: >>>>> > >>>>> > On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao <j...@confluent.io> wrote: >>>>> > >>>>> > > Hi, Dong, >>>>> > > >>>>> > > Thanks for the updated wiki. A few comments below. >>>>> > > >>>>> > > 1. Topics get created >>>>> > > 1.1 Instead of storing successfully created replicas in ZK, could >>>>> we >>>>> > store >>>>> > > unsuccessfully created replicas in ZK? Since the latter is less >>>>> common, >>>>> > it >>>>> > > probably reduces the load on ZK. >>>>> > > >>>>> > >>>>> > We can store unsuccessfully created replicas in ZK. But I am not >>>>> sure if >>>>> > that can reduce write load on ZK. >>>>> > >>>>> > If we want to reduce write load on ZK using by store unsuccessfully >>>>> created >>>>> > replicas in ZK, then broker should not write to ZK if all replicas >>>>> are >>>>> > successfully created. It means that if /broker/topics/[topic]/partiti >>>>> > ons/[partitionId]/controller_managed_state doesn't exist in ZK for >>>>> a given >>>>> > partition, we have to assume all replicas of this partition have been >>>>> > successfully created and send LeaderAndIsrRequest with create = >>>>> false. This >>>>> > becomes a problem if controller crashes before receiving >>>>> > LeaderAndIsrResponse to validate whether a replica has been created. >>>>> > >>>>> > I think this approach and reduce the number of bytes stored in ZK. >>>>> But I am >>>>> > not sure if this is a concern. >>>>> > >>>>> > >>>>> > >>>>> I was mostly concerned about the controller failover time. Currently, >>>>> the >>>>> controller failover is likely dominated by the cost of reading >>>>> topic/partition level information from ZK. If we add another partition >>>>> level path in ZK, it probably will double the controller failover >>>>> time. If >>>>> the approach of representing the non-created replicas doesn't work, >>>>> have >>>>> you considered just adding the created flag in the leaderAndIsr path >>>>> in ZK? >>>>> >>>>> >>>> Yes, I have considered adding the created flag in the leaderAndIsr path >>>> in ZK. If we were to add created flag per replica in the >>>> LeaderAndIsrRequest, then it requires a lot of change in the code base. >>>> >>>> If we don't add created flag per replica in the LeaderAndIsrRequest, >>>> then the information in leaderAndIsr path in ZK and LeaderAndIsrRequest >>>> would be different. Further, the procedure for broker to update ISR in ZK >>>> will be a bit complicated. When leader updates leaderAndIsr path in ZK, it >>>> will have to first read created flags from ZK, change isr, and write >>>> leaderAndIsr back to ZK. And it needs to check znode version and re-try >>>> write operation in ZK if controller has updated ZK during this period. This >>>> is in contrast to the current implementation where the leader either gets >>>> all the information from LeaderAndIsrRequest sent by controller, or >>>> determine the infromation by itself (e.g. ISR), before writing to >>>> leaderAndIsr path in ZK. >>>> >>>> It seems to me that the above solution is a bit complicated and not >>>> clean. Thus I come up with the design in this KIP to store this created >>>> flag in a separate zk path. The path is named controller_managed_state to >>>> indicate that we can store in this znode all information that is managed by >>>> controller only, as opposed to ISR. >>>> >>>> I agree with your concern of increased ZK read time during controller >>>> failover. How about we store the "created" information in the >>>> znode /brokers/topics/[topic]? We can change that znode to have the >>>> following data format: >>>> >>>> { >>>> "version" : 2, >>>> "created" : { >>>> "1" : [1, 2, 3], >>>> ... >>>> } >>>> "partition" : { >>>> "1" : [1, 2, 3], >>>> ... >>>> } >>>> } >>>> >>>> We won't have extra zk read using this solution. It also seems >>>> reasonable to put the partition assignment information together with >>>> replica creation information. The latter is only changed once after the >>>> partition is created or re-assigned. >>>> >>>> >>>>> >>>>> >>>>> > >>>>> > > 1.2 If an error is received for a follower, does the controller >>>>> eagerly >>>>> > > remove it from ISR or do we just let the leader removes it after >>>>> timeout? >>>>> > > >>>>> > >>>>> > No, Controller will not actively remove it from ISR. But controller >>>>> will >>>>> > recognize it as offline replica and propagate this information to all >>>>> > brokers via UpdateMetadataRequest. Each leader can use this >>>>> information to >>>>> > actively remove offline replica from ISR set. I have updated to wiki >>>>> to >>>>> > clarify it. >>>>> > >>>>> > >>>>> >>>>> That seems inconsistent with how the controller deals with offline >>>>> replicas >>>>> due to broker failures. When that happens, the broker will (1) select >>>>> a new >>>>> leader if the offline replica is the leader; (2) remove the replica >>>>> from >>>>> ISR if the offline replica is the follower. So, intuitively, it seems >>>>> that >>>>> we should be doing the same thing when dealing with offline replicas >>>>> due to >>>>> disk failure. >>>>> >>>> >>>> My bad. I misunderstand how the controller currently handles broker >>>> failure and ISR change. Yes we should do the same thing when dealing with >>>> offline replicas here. I have updated the KIP to specify that, when an >>>> offline replica is discovered by controller, the controller removes offline >>>> replicas from ISR in the ZK and sends LeaderAndIsrRequest with updated ISR >>>> to be used by partition leaders. >>>> >>>> >>>>> >>>>> >>>>> >>>>> > >>>>> > > 1.3 Similar, if an error is received for a leader, should the >>>>> controller >>>>> > > trigger leader election again? >>>>> > > >>>>> > >>>>> > Yes, controller will trigger leader election if leader replica is >>>>> offline. >>>>> > I have updated the wiki to clarify it. >>>>> > >>>>> > >>>>> > > >>>>> > > 2. A log directory stops working on a broker during runtime: >>>>> > > 2.1 It seems the broker remembers the failed directory after >>>>> hitting an >>>>> > > IOException and the failed directory won't be used for creating new >>>>> > > partitions until the broker is restarted? If so, could you add >>>>> that to >>>>> > the >>>>> > > wiki. >>>>> > > >>>>> > >>>>> > Right, broker assumes a log directory to be good after it starts, >>>>> and mark >>>>> > log directory as bad once there is IOException when broker attempts >>>>> to >>>>> > access the log directory. New replicas will only be created on good >>>>> log >>>>> > directory. I just added this to the KIP. >>>>> > >>>>> > >>>>> > > 2.2 Could you be a bit more specific on how and during which >>>>> operation >>>>> > the >>>>> > > broker detects directory failure? Is it when the broker hits an >>>>> > IOException >>>>> > > during writes, or both reads and writes? For example, during >>>>> broker >>>>> > > startup, it only reads from each of the log directories, if it >>>>> hits an >>>>> > > IOException there, does the broker immediately mark the directory >>>>> as >>>>> > > offline? >>>>> > > >>>>> > >>>>> > Broker marks log directory as bad once there is IOException when >>>>> broker >>>>> > attempts to access the log directory. This includes read and write. >>>>> These >>>>> > operations include log append, log read, log cleaning, watermark >>>>> checkpoint >>>>> > etc. If broker hits IOException when it reads from each of the log >>>>> > directory during startup, it immediately mark the directory as >>>>> offline. >>>>> > >>>>> > I just updated the KIP to clarify it. >>>>> > >>>>> > >>>>> > > 3. Partition reassignment: If we know a replica is offline, do we >>>>> still >>>>> > > want to send StopReplicaRequest to it? >>>>> > > >>>>> > >>>>> > No, controller doesn't send StopReplicaRequest for an offline >>>>> replica. >>>>> > Controller treats this scenario in the same way that exiting Kafka >>>>> > implementation does when the broker of this replica is offline. >>>>> > >>>>> > >>>>> > > >>>>> > > 4. UpdateMetadataRequestPartitionState: For offline_replicas, do >>>>> they >>>>> > only >>>>> > > include offline replicas due to log directory failures or do they >>>>> also >>>>> > > include offline replicas due to broker failure? >>>>> > > >>>>> > >>>>> > UpdateMetadataRequestPartitionState's offline_replicas include >>>>> offline >>>>> > replicas due to both log directory failure and broker failure. This >>>>> is to >>>>> > make the semantics of this field easier to understand. Broker can >>>>> > distinguish whether a replica is offline due to broker failure or >>>>> disk >>>>> > failure by checking whether a broker is live in the >>>>> UpdateMetadataRequest. >>>>> > >>>>> > >>>>> > > >>>>> > > 5. Tools: Could we add some kind of support in the tool to list >>>>> offline >>>>> > > directories? >>>>> > > >>>>> > >>>>> > In KIP-112 we don't have tools to list offline directories because >>>>> we have >>>>> > intentionally avoided exposing log directory information (e.g. log >>>>> > directory path) to user or other brokers. I think we can add this >>>>> feature >>>>> > in KIP-113, in which we will have DescribeDirsRequest to list log >>>>> directory >>>>> > information (e.g. partition assignment, path, size) needed for >>>>> rebalance. >>>>> > >>>>> > >>>>> Since we are introducing a new failure mode, if a replica becomes >>>>> offline >>>>> due to failure in log directories, the first thing an admin wants to >>>>> know >>>>> is which log directories are offline from the broker's perspective. >>>>> So, >>>>> including such a tool will be useful. Do you plan to do KIP-112 and >>>>> KIP-113 >>>>> in the same release? >>>>> >>>>> >>>> Yes, I agree that including such a tool is using. This is probably >>>> better to be added in KIP-113 because we need DescribeDirsRequest to get >>>> this information. I will update KIP-113 to include this tool. >>>> >>>> I plan to do KIP-112 and KIP-113 separately to make each KIP and their >>>> patch easier to review. I don't have any plan about which release to have >>>> these KIPs. My plan is to both of them ASAP. Is there particular timeline >>>> you prefer for code of these two KIPs to checked-in? >>>> >>>> >>>>> > >>>>> > > >>>>> > > 6. Metrics: Could we add some metrics to show offline directories? >>>>> > > >>>>> > >>>>> > Sure. I think it makes sense to have each broker report its number of >>>>> > offline replicas and offline log directories. The previous metric >>>>> was put >>>>> > in KIP-113. I just added both metrics in KIP-112. >>>>> > >>>>> > >>>>> > > >>>>> > > 7. There are still references to kafka-log-dirs.sh. Are they valid? >>>>> > > >>>>> > >>>>> > My bad. I just removed this from "Changes in Operational Procedures" >>>>> and >>>>> > "Test Plan" in the KIP. >>>>> > >>>>> > >>>>> > > >>>>> > > 8. Do you think KIP-113 is ready for review? One thing that KIP-113 >>>>> > > mentions during partition reassignment is to first send >>>>> > > LeaderAndIsrRequest, followed by ChangeReplicaDirRequest. It seems >>>>> it's >>>>> > > better if the replicas are created in the right log directory in >>>>> the >>>>> > first >>>>> > > place? The reason that I brought it up here is because it may >>>>> affect the >>>>> > > protocol of LeaderAndIsrRequest. >>>>> > > >>>>> > >>>>> > Yes, KIP-113 is ready for review. The advantage of the current >>>>> design is >>>>> > that we can keep LeaderAndIsrRequest log-direcotry-agnostic. The >>>>> > implementation would be much easier to read if all log related logic >>>>> (e.g. >>>>> > various errors) are put in ChangeReplicadIRrequest and the code path >>>>> of >>>>> > handling replica movement is separated from leadership handling. >>>>> > >>>>> > In other words, I think Kafka may be easier to develop in the long >>>>> term if >>>>> > we separate these two requests. >>>>> > >>>>> > I agree that ideally we want to create replicas in the right log >>>>> directory >>>>> > in the first place. But I am not sure if there is any performance or >>>>> > correctness concern with the existing way of moving it after it is >>>>> created. >>>>> > Besides, does this decision affect the change proposed in KIP-112? >>>>> > >>>>> > >>>>> I am just wondering if you have considered including the log directory >>>>> for >>>>> the replicas in the LeaderAndIsrRequest. >>>>> >>>>> >>>> Yeah I have thought about this idea, but only briefly. I rejected this >>>> idea because log directory is broker's local information and I prefer not >>>> to expose local config information to the cluster through >>>> LeaderAndIsrRequest. >>>> >>>> >>>>> 9. Could you describe when the offline replicas due to log directory >>>>> failure are removed from the replica fetch threads? >>>>> >>>> >>>> Yes. If the offline replica was a leader, either a new leader is >>>> elected or all follower brokers will stop fetching for this partition. If >>>> the offline replica is a follower, the broker will stop fetching for this >>>> replica immediately. A broker stops fetching data for a replica by removing >>>> the replica from the replica fetch threads. I have updated the KIP to >>>> clarify it. >>>> >>>> >>>>> >>>>> 10. The wiki mentioned changing the log directory to a file for >>>>> simulating >>>>> disk failure in system tests. Could we just change the permission of >>>>> the >>>>> log directory to 000 to simulate that? >>>>> >>>> >>>> >>>> Sure, >>>> >>>> >>>>> >>>>> Thanks, >>>>> >>>>> Jun >>>>> >>>>> >>>>> > > Jun >>>>> > > >>>>> > > On Fri, Feb 10, 2017 at 9:53 AM, Dong Lin <lindon...@gmail.com> >>>>> wrote: >>>>> > > >>>>> > > > Hi Jun, >>>>> > > > >>>>> > > > Can I replace zookeeper access with direct RPC for both ISR >>>>> > notification >>>>> > > > and disk failure notification in a future KIP, or do you feel we >>>>> should >>>>> > > do >>>>> > > > it in this KIP? >>>>> > > > >>>>> > > > Hi Eno, Grant and everyone, >>>>> > > > >>>>> > > > Is there further improvement you would like to see with this KIP? >>>>> > > > >>>>> > > > Thanks you all for the comments, >>>>> > > > >>>>> > > > Dong >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > On Thu, Feb 9, 2017 at 4:45 PM, Dong Lin <lindon...@gmail.com> >>>>> wrote: >>>>> > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > On Thu, Feb 9, 2017 at 3:37 PM, Colin McCabe < >>>>> cmcc...@apache.org> >>>>> > > wrote: >>>>> > > > > >>>>> > > > >> On Thu, Feb 9, 2017, at 11:40, Dong Lin wrote: >>>>> > > > >> > Thanks for all the comments Colin! >>>>> > > > >> > >>>>> > > > >> > To answer your questions: >>>>> > > > >> > - Yes, a broker will shutdown if all its log directories >>>>> are bad. >>>>> > > > >> >>>>> > > > >> That makes sense. Can you add this to the writeup? >>>>> > > > >> >>>>> > > > > >>>>> > > > > Sure. This has already been added. You can find it here >>>>> > > > > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio >>>>> n.action >>>>> > ? >>>>> > > > pageId=67638402&selectedPageVersions=9&selectedPageVersions=10> >>>>> > > > > . >>>>> > > > > >>>>> > > > > >>>>> > > > >> >>>>> > > > >> > - I updated the KIP to explicitly state that a log >>>>> directory will >>>>> > be >>>>> > > > >> > assumed to be good until broker sees IOException when it >>>>> tries to >>>>> > > > access >>>>> > > > >> > the log directory. >>>>> > > > >> >>>>> > > > >> Thanks. >>>>> > > > >> >>>>> > > > >> > - Controller doesn't explicitly know whether there is new >>>>> log >>>>> > > > directory >>>>> > > > >> > or >>>>> > > > >> > not. All controller knows is whether replicas are online or >>>>> > offline >>>>> > > > >> based >>>>> > > > >> > on LeaderAndIsrResponse. According to the existing Kafka >>>>> > > > implementation, >>>>> > > > >> > controller will always send LeaderAndIsrRequest to a broker >>>>> after >>>>> > it >>>>> > > > >> > bounces. >>>>> > > > >> >>>>> > > > >> I thought so. It's good to clarify, though. Do you think >>>>> it's >>>>> > worth >>>>> > > > >> adding a quick discussion of this on the wiki? >>>>> > > > >> >>>>> > > > > >>>>> > > > > Personally I don't think it is needed. If broker starts with >>>>> no bad >>>>> > log >>>>> > > > > directory, everything should work it is and we should not need >>>>> to >>>>> > > clarify >>>>> > > > > it. The KIP has already covered the scenario when a broker >>>>> starts >>>>> > with >>>>> > > > bad >>>>> > > > > log directory. Also, the KIP doesn't claim or hint that we >>>>> support >>>>> > > > dynamic >>>>> > > > > addition of new log directories. I think we are good. >>>>> > > > > >>>>> > > > > >>>>> > > > >> best, >>>>> > > > >> Colin >>>>> > > > >> >>>>> > > > >> > >>>>> > > > >> > Please see this >>>>> > > > >> > <https://cwiki.apache.org/conf >>>>> luence/pages/diffpagesbyversio >>>>> > > > >> n.action?pageId=67638402&selectedPageVersions=9& >>>>> > > > selectedPageVersions=10> >>>>> > > > >> > for the change of the KIP. >>>>> > > > >> > >>>>> > > > >> > On Thu, Feb 9, 2017 at 11:04 AM, Colin McCabe < >>>>> cmcc...@apache.org >>>>> > > >>>>> > > > >> wrote: >>>>> > > > >> > >>>>> > > > >> > > On Thu, Feb 9, 2017, at 11:03, Colin McCabe wrote: >>>>> > > > >> > > > Thanks, Dong L. >>>>> > > > >> > > > >>>>> > > > >> > > > Do we plan on bringing down the broker process when all >>>>> log >>>>> > > > >> directories >>>>> > > > >> > > > are offline? >>>>> > > > >> > > > >>>>> > > > >> > > > Can you explicitly state on the KIP that the log dirs >>>>> are all >>>>> > > > >> considered >>>>> > > > >> > > > good after the broker process is bounced? It seems >>>>> like an >>>>> > > > >> important >>>>> > > > >> > > > thing to be clear about. Also, perhaps discuss how the >>>>> > > controller >>>>> > > > >> > > > becomes aware of the newly good log directories after a >>>>> broker >>>>> > > > >> bounce >>>>> > > > >> > > > (and whether this triggers re-election). >>>>> > > > >> > > >>>>> > > > >> > > I meant to write, all the log dirs where the broker can >>>>> still >>>>> > read >>>>> > > > the >>>>> > > > >> > > index and some other files. Clearly, log dirs that are >>>>> > completely >>>>> > > > >> > > inaccessible will still be considered bad after a broker >>>>> process >>>>> > > > >> bounce. >>>>> > > > >> > > >>>>> > > > >> > > best, >>>>> > > > >> > > Colin >>>>> > > > >> > > >>>>> > > > >> > > > >>>>> > > > >> > > > +1 (non-binding) aside from that >>>>> > > > >> > > > >>>>> > > > >> > > > >>>>> > > > >> > > > >>>>> > > > >> > > > On Wed, Feb 8, 2017, at 00:47, Dong Lin wrote: >>>>> > > > >> > > > > Hi all, >>>>> > > > >> > > > > >>>>> > > > >> > > > > Thank you all for the helpful suggestion. I have >>>>> updated the >>>>> > > KIP >>>>> > > > >> to >>>>> > > > >> > > > > address >>>>> > > > >> > > > > the comments received so far. See here >>>>> > > > >> > > > > <https://cwiki.apache.org/conf >>>>> > luence/pages/diffpagesbyversio >>>>> > > > >> n.action? >>>>> > > > >> > > pageId=67638402&selectedPageVe >>>>> rsions=8&selectedPageVersions= >>>>> > 9>to >>>>> > > > >> > > > > read the changes of the KIP. Here is a summary of >>>>> change: >>>>> > > > >> > > > > >>>>> > > > >> > > > > - Updated the Proposed Change section to change the >>>>> recovery >>>>> > > > >> steps. >>>>> > > > >> > > After >>>>> > > > >> > > > > this change, broker will also create replica as long >>>>> as all >>>>> > > log >>>>> > > > >> > > > > directories >>>>> > > > >> > > > > are working. >>>>> > > > >> > > > > - Removed kafka-log-dirs.sh from this KIP since user >>>>> no >>>>> > longer >>>>> > > > >> needs to >>>>> > > > >> > > > > use >>>>> > > > >> > > > > it for recovery from bad disks. >>>>> > > > >> > > > > - Explained how the znode controller_managed_state is >>>>> > managed >>>>> > > in >>>>> > > > >> the >>>>> > > > >> > > > > Public >>>>> > > > >> > > > > interface section. >>>>> > > > >> > > > > - Explained what happens during controller failover, >>>>> > partition >>>>> > > > >> > > > > reassignment >>>>> > > > >> > > > > and topic deletion in the Proposed Change section. >>>>> > > > >> > > > > - Updated Future Work section to include the following >>>>> > > potential >>>>> > > > >> > > > > improvements >>>>> > > > >> > > > > - Let broker notify controller of ISR change and >>>>> disk >>>>> > state >>>>> > > > >> change >>>>> > > > >> > > via >>>>> > > > >> > > > > RPC instead of using zookeeper >>>>> > > > >> > > > > - Handle various failure scenarios (e.g. slow disk) >>>>> on a >>>>> > > > >> case-by-case >>>>> > > > >> > > > > basis. For example, we may want to detect slow disk >>>>> and >>>>> > > consider >>>>> > > > >> it as >>>>> > > > >> > > > > offline. >>>>> > > > >> > > > > - Allow admin to mark a directory as bad so that it >>>>> will >>>>> > not >>>>> > > > be >>>>> > > > >> used. >>>>> > > > >> > > > > >>>>> > > > >> > > > > Thanks, >>>>> > > > >> > > > > Dong >>>>> > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > On Tue, Feb 7, 2017 at 5:23 PM, Dong Lin < >>>>> > lindon...@gmail.com >>>>> > > > >>>>> > > > >> wrote: >>>>> > > > >> > > > > >>>>> > > > >> > > > > > Hey Eno, >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > Thanks much for the comment! >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > I still think the complexity added to Kafka is >>>>> justified >>>>> > by >>>>> > > > its >>>>> > > > >> > > benefit. >>>>> > > > >> > > > > > Let me provide my reasons below. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > 1) The additional logic is easy to understand and >>>>> thus its >>>>> > > > >> complexity >>>>> > > > >> > > > > > should be reasonable. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > On the broker side, it needs to catch exception when >>>>> > access >>>>> > > > log >>>>> > > > >> > > directory, >>>>> > > > >> > > > > > mark log directory and all its replicas as offline, >>>>> notify >>>>> > > > >> > > controller by >>>>> > > > >> > > > > > writing the zookeeper notification path, and >>>>> specify error >>>>> > > in >>>>> > > > >> > > > > > LeaderAndIsrResponse. On the controller side, it >>>>> will >>>>> > > listener >>>>> > > > >> to >>>>> > > > >> > > > > > zookeeper for disk failure notification, learn about >>>>> > offline >>>>> > > > >> > > replicas in >>>>> > > > >> > > > > > the LeaderAndIsrResponse, and take offline replicas >>>>> into >>>>> > > > >> > > consideration when >>>>> > > > >> > > > > > electing leaders. It also mark replica as created in >>>>> > > zookeeper >>>>> > > > >> and >>>>> > > > >> > > use it >>>>> > > > >> > > > > > to determine whether a replica is created. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > That is all the logic we need to add in Kafka. I >>>>> > personally >>>>> > > > feel >>>>> > > > >> > > this is >>>>> > > > >> > > > > > easy to reason about. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > 2) The additional code is not much. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > I expect the code for KIP-112 to be around 1100 >>>>> lines new >>>>> > > > code. >>>>> > > > >> > > Previously >>>>> > > > >> > > > > > I have implemented a prototype of a slightly >>>>> different >>>>> > > design >>>>> > > > >> (see >>>>> > > > >> > > here >>>>> > > > >> > > > > > <https://docs.google.com/docum >>>>> ent/d/1Izza0SBmZMVUBUt9s_ >>>>> > > > >> > > -Dqi3D8e0KGJQYW8xgEdRsgAI/edit>) >>>>> > > > >> > > > > > and uploaded it to github (see here >>>>> > > > >> > > > > > <https://github.com/lindong28/kafka/tree/JBOD>). >>>>> The >>>>> > patch >>>>> > > > >> changed >>>>> > > > >> > > 33 >>>>> > > > >> > > > > > files, added 1185 lines and deleted 183 lines. The >>>>> size of >>>>> > > > >> prototype >>>>> > > > >> > > patch >>>>> > > > >> > > > > > is actually smaller than patch of KIP-107 (see here >>>>> > > > >> > > > > > <https://github.com/apache/kafka/pull/2476>) which >>>>> is >>>>> > > already >>>>> > > > >> > > accepted. >>>>> > > > >> > > > > > The KIP-107 patch changed 49 files, added 1349 >>>>> lines and >>>>> > > > >> deleted 141 >>>>> > > > >> > > lines. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > 3) Comparison with one-broker-per-multiple-volumes >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > This KIP can improve the availability of Kafka in >>>>> this >>>>> > case >>>>> > > > such >>>>> > > > >> > > that one >>>>> > > > >> > > > > > failed volume doesn't bring down the entire broker. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > 4) Comparison with one-broker-per-volume >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > If each volume maps to multiple disks, then we >>>>> still have >>>>> > > > >> similar >>>>> > > > >> > > problem >>>>> > > > >> > > > > > such that the broker will fail if any disk of the >>>>> volume >>>>> > > > failed. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > If each volume maps to one disk, it means that we >>>>> need to >>>>> > > > >> deploy 10 >>>>> > > > >> > > > > > brokers on a machine if the machine has 10 disks. I >>>>> will >>>>> > > > >> explain the >>>>> > > > >> > > > > > concern with this approach in order of their >>>>> importance. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > - It is weird if we were to tell kafka user to >>>>> deploy 50 >>>>> > > > >> brokers on a >>>>> > > > >> > > > > > machine of 50 disks. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > - Either when user deploys Kafka on a commercial >>>>> cloud >>>>> > > > platform >>>>> > > > >> or >>>>> > > > >> > > when >>>>> > > > >> > > > > > user deploys their own cluster, the size or largest >>>>> disk >>>>> > is >>>>> > > > >> usually >>>>> > > > >> > > > > > limited. There will be scenarios where user want to >>>>> > increase >>>>> > > > >> broker >>>>> > > > >> > > > > > capacity by having multiple disks per broker. This >>>>> JBOD >>>>> > KIP >>>>> > > > >> makes it >>>>> > > > >> > > > > > feasible without hurting availability due to single >>>>> disk >>>>> > > > >> failure. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > - Automatic load rebalance across disks will be >>>>> easier and >>>>> > > > more >>>>> > > > >> > > flexible >>>>> > > > >> > > > > > if one broker has multiple disks. This can be >>>>> future work. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > - There is performance concern when you deploy 10 >>>>> broker >>>>> > > vs. 1 >>>>> > > > >> > > broker on >>>>> > > > >> > > > > > one machine. The metadata the cluster, including >>>>> > > FetchRequest, >>>>> > > > >> > > > > > ProduceResponse, MetadataRequest and so on will all >>>>> be 10X >>>>> > > > >> more. The >>>>> > > > >> > > > > > packet-per-second will be 10X higher which may limit >>>>> > > > >> performance if >>>>> > > > >> > > pps is >>>>> > > > >> > > > > > the performance bottleneck. The number of socket on >>>>> the >>>>> > > > machine >>>>> > > > >> is >>>>> > > > >> > > 10X >>>>> > > > >> > > > > > higher. And the number of replication thread will >>>>> be 100X >>>>> > > > more. >>>>> > > > >> The >>>>> > > > >> > > impact >>>>> > > > >> > > > > > will be more significant with increasing number of >>>>> disks >>>>> > per >>>>> > > > >> > > machine. Thus >>>>> > > > >> > > > > > it will limit Kakfa's scalability in the long term. >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > Thanks, >>>>> > > > >> > > > > > Dong >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > On Tue, Feb 7, 2017 at 1:51 AM, Eno Thereska < >>>>> > > > >> eno.there...@gmail.com >>>>> > > > >> > > > >>>>> > > > >> > > > > > wrote: >>>>> > > > >> > > > > > >>>>> > > > >> > > > > >> Hi Dong, >>>>> > > > >> > > > > >> >>>>> > > > >> > > > > >> To simplify the discussion today, on my part I'll >>>>> zoom >>>>> > into >>>>> > > > one >>>>> > > > >> > > thing >>>>> > > > >> > > > > >> only: >>>>> > > > >> > > > > >> >>>>> > > > >> > > > > >> - I'll discuss the options called below : >>>>> > > > >> "one-broker-per-disk" or >>>>> > > > >> > > > > >> "one-broker-per-few-disks". >>>>> > > > >> > > > > >> >>>>> > > > >> > > > > >> - I completely buy the JBOD vs RAID arguments so >>>>> there is >>>>> > > no >>>>> > > > >> need to >>>>> > > > >> > > > > >> discuss that part for me. I buy it that JBODs are >>>>> good. >>>>> > > > >> > > > > >> >>>>> > > > >> > > > > >> I find the terminology can be improved a bit. >>>>> Ideally >>>>> > we'd >>>>> > > be >>>>> > > > >> > > talking >>>>> > > > >> > > > > >> about volumes, not disks. Just to make it clear >>>>> that >>>>> > Kafka >>>>> > > > >> > > understand >>>>> > > > >> > > > > >> volumes/directories, not individual raw disks. So >>>>> by >>>>> > > > >> > > > > >> "one-broker-per-few-disks" what I mean is that the >>>>> admin >>>>> > > can >>>>> > > > >> pool a >>>>> > > > >> > > few >>>>> > > > >> > > > > >> disks together to create a volume/directory and >>>>> give that >>>>> > > to >>>>> > > > >> Kafka. >>>>> > > > >> > > > > >> >>>>> > > > >> > > > > >> >>>>> > > > >> > > > > >> The kernel of my question will be that the admin >>>>> already >>>>> > > has >>>>> > > > >> tools >>>>> > > > >> > > to 1) >>>>> > > > >> > > > > >> create volumes/directories from a JBOD and 2) >>>>> start a >>>>> > > broker >>>>> > > > >> on a >>>>> > > > >> > > desired >>>>> > > > >> > > > > >> machine and 3) assign a broker resources like a >>>>> > directory. >>>>> > > I >>>>> > > > >> claim >>>>> > > > >> > > that >>>>> > > > >> > > > > >> those tools are sufficient to optimise resource >>>>> > allocation. >>>>> > > > I >>>>> > > > >> > > understand >>>>> > > > >> > > > > >> that a broker could manage point 3) itself, ie >>>>> juggle the >>>>> > > > >> > > directories. My >>>>> > > > >> > > > > >> question is whether the complexity added to Kafka >>>>> is >>>>> > > > justified. >>>>> > > > >> > > > > >> Operationally it seems to me an admin will still >>>>> have to >>>>> > do >>>>> > > > >> all the >>>>> > > > >> > > three >>>>> > > > >> > > > > >> items above. >>>>> > > > >> > > > > >> >>>>> > > > >> > > > > >> Looking forward to the discussion >>>>> > > > >> > > > > >> Thanks >>>>> > > > >> > > > > >> Eno >>>>> > > > >> > > > > >> >>>>> > > > >> > > > > >> >>>>> > > > >> > > > > >> > On 1 Feb 2017, at 17:21, Dong Lin < >>>>> lindon...@gmail.com >>>>> > > >>>>> > > > >> wrote: >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > Hey Eno, >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > Thanks much for the review. >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > I think your suggestion is to split disks of a >>>>> machine >>>>> > > into >>>>> > > > >> > > multiple >>>>> > > > >> > > > > >> disk >>>>> > > > >> > > > > >> > sets and run one broker per disk set. Yeah this >>>>> is >>>>> > > similar >>>>> > > > to >>>>> > > > >> > > Colin's >>>>> > > > >> > > > > >> > suggestion of one-broker-per-disk, which we have >>>>> > > evaluated >>>>> > > > at >>>>> > > > >> > > LinkedIn >>>>> > > > >> > > > > >> and >>>>> > > > >> > > > > >> > considered it to be a good short term approach. >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > As of now I don't think any of these approach is >>>>> a >>>>> > better >>>>> > > > >> > > alternative in >>>>> > > > >> > > > > >> > the long term. I will summarize these here. I >>>>> have put >>>>> > > > these >>>>> > > > >> > > reasons in >>>>> > > > >> > > > > >> the >>>>> > > > >> > > > > >> > KIP's motivation section and rejected alternative >>>>> > > section. >>>>> > > > I >>>>> > > > >> am >>>>> > > > >> > > happy to >>>>> > > > >> > > > > >> > discuss more and I would certainly like to use an >>>>> > > > alternative >>>>> > > > >> > > solution >>>>> > > > >> > > > > >> that >>>>> > > > >> > > > > >> > is easier to do with better performance. >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > - JBOD vs. RAID-10: if we switch from RAID-10 >>>>> with >>>>> > > > >> > > > > >> replication-factoer=2 to >>>>> > > > >> > > > > >> > JBOD with replicatio-factor=3, we get 25% >>>>> reduction in >>>>> > > disk >>>>> > > > >> usage >>>>> > > > >> > > and >>>>> > > > >> > > > > >> > doubles the tolerance of broker failure before >>>>> data >>>>> > > > >> > > unavailability from >>>>> > > > >> > > > > >> 1 >>>>> > > > >> > > > > >> > to 2. This is pretty huge gain for any company >>>>> that >>>>> > uses >>>>> > > > >> Kafka at >>>>> > > > >> > > large >>>>> > > > >> > > > > >> > scale. >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > - JBOD vs. one-broker-per-disk: The benefit of >>>>> > > > >> > > one-broker-per-disk is >>>>> > > > >> > > > > >> that >>>>> > > > >> > > > > >> > no major code change is needed in Kafka. Among >>>>> the >>>>> > > > >> disadvantage of >>>>> > > > >> > > > > >> > one-broker-per-disk summarized in the KIP and >>>>> previous >>>>> > > > email >>>>> > > > >> with >>>>> > > > >> > > Colin, >>>>> > > > >> > > > > >> > the biggest one is the 15% throughput loss >>>>> compared to >>>>> > > JBOD >>>>> > > > >> and >>>>> > > > >> > > less >>>>> > > > >> > > > > >> > flexibility to balance across disks. Further, it >>>>> > probably >>>>> > > > >> requires >>>>> > > > >> > > > > >> change >>>>> > > > >> > > > > >> > to internal deployment tools at various >>>>> companies to >>>>> > deal >>>>> > > > >> with >>>>> > > > >> > > > > >> > one-broker-per-disk setup. >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > - JBOD vs. RAID-0: This is the setup that used at >>>>> > > > Microsoft. >>>>> > > > >> The >>>>> > > > >> > > > > >> problem is >>>>> > > > >> > > > > >> > that a broker becomes unavailable if any disk >>>>> fail. >>>>> > > Suppose >>>>> > > > >> > > > > >> > replication-factor=2 and there are 10 disks per >>>>> > machine. >>>>> > > > >> Then the >>>>> > > > >> > > > > >> > probability of of any message becomes >>>>> unavailable due >>>>> > to >>>>> > > > disk >>>>> > > > >> > > failure >>>>> > > > >> > > > > >> with >>>>> > > > >> > > > > >> > RAID-0 is 100X higher than that with JBOD. >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > - JBOD vs. one-broker-per-few-disks: >>>>> > > > one-broker-per-few-disk >>>>> > > > >> is >>>>> > > > >> > > > > >> somewhere >>>>> > > > >> > > > > >> > between one-broker-per-disk and RAID-0. So it >>>>> carries >>>>> > an >>>>> > > > >> averaged >>>>> > > > >> > > > > >> > disadvantages of these two approaches. >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > To answer your question regarding, I think it is >>>>> > > reasonable >>>>> > > > >> to >>>>> > > > >> > > mange >>>>> > > > >> > > > > >> disk >>>>> > > > >> > > > > >> > in Kafka. By "managing disks" we mean the >>>>> management of >>>>> > > > >> > > assignment of >>>>> > > > >> > > > > >> > replicas across disks. Here are my reasons in >>>>> more >>>>> > > detail: >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > - I don't think this KIP is a big step change. By >>>>> > > allowing >>>>> > > > >> user to >>>>> > > > >> > > > > >> > configure Kafka to run multiple log directories >>>>> or >>>>> > disks >>>>> > > as >>>>> > > > >> of >>>>> > > > >> > > now, it >>>>> > > > >> > > > > >> is >>>>> > > > >> > > > > >> > implicit that Kafka manages disks. It is just >>>>> not a >>>>> > > > complete >>>>> > > > >> > > feature. >>>>> > > > >> > > > > >> > Microsoft and probably other companies are using >>>>> this >>>>> > > > feature >>>>> > > > >> > > under the >>>>> > > > >> > > > > >> > undesirable effect that a broker will fail any >>>>> if any >>>>> > > disk >>>>> > > > >> fail. >>>>> > > > >> > > It is >>>>> > > > >> > > > > >> good >>>>> > > > >> > > > > >> > to complete this feature. >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > - I think it is reasonable to manage disk in >>>>> Kafka. One >>>>> > > of >>>>> > > > >> the >>>>> > > > >> > > most >>>>> > > > >> > > > > >> > important work that Kafka is doing is to >>>>> determine the >>>>> > > > >> replica >>>>> > > > >> > > > > >> assignment >>>>> > > > >> > > > > >> > across brokers and make sure enough copies of a >>>>> given >>>>> > > > >> replica is >>>>> > > > >> > > > > >> available. >>>>> > > > >> > > > > >> > I would argue that it is not much different than >>>>> > > > determining >>>>> > > > >> the >>>>> > > > >> > > replica >>>>> > > > >> > > > > >> > assignment across disk conceptually. >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > - I would agree that this KIP is improve >>>>> performance of >>>>> > > > >> Kafka at >>>>> > > > >> > > the >>>>> > > > >> > > > > >> cost >>>>> > > > >> > > > > >> > of more complexity inside Kafka, by switching >>>>> from >>>>> > > RAID-10 >>>>> > > > to >>>>> > > > >> > > JBOD. I >>>>> > > > >> > > > > >> would >>>>> > > > >> > > > > >> > argue that this is a right direction. If we can >>>>> gain >>>>> > 20%+ >>>>> > > > >> > > performance by >>>>> > > > >> > > > > >> > managing NIC in Kafka as compared to existing >>>>> approach >>>>> > > and >>>>> > > > >> other >>>>> > > > >> > > > > >> > alternatives, I would say we should just do it. >>>>> Such a >>>>> > > gain >>>>> > > > >> in >>>>> > > > >> > > > > >> performance, >>>>> > > > >> > > > > >> > or equivalently reduction in cost, can save >>>>> millions of >>>>> > > > >> dollars >>>>> > > > >> > > per year >>>>> > > > >> > > > > >> > for any company running Kafka at large scale. >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > Thanks, >>>>> > > > >> > > > > >> > Dong >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> > On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska < >>>>> > > > >> > > eno.there...@gmail.com> >>>>> > > > >> > > > > >> wrote: >>>>> > > > >> > > > > >> > >>>>> > > > >> > > > > >> >> I'm coming somewhat late to the discussion, >>>>> apologies >>>>> > > for >>>>> > > > >> that. >>>>> > > > >> > > > > >> >> >>>>> > > > >> > > > > >> >> I'm worried about this proposal. It's moving >>>>> Kafka to >>>>> > a >>>>> > > > >> world >>>>> > > > >> > > where it >>>>> > > > >> > > > > >> >> manages disks. So in a sense, the scope of the >>>>> KIP is >>>>> > > > >> limited, >>>>> > > > >> > > but the >>>>> > > > >> > > > > >> >> direction it sets for Kafka is quite a big step >>>>> > change. >>>>> > > > >> > > Fundamentally >>>>> > > > >> > > > > >> this >>>>> > > > >> > > > > >> >> is about balancing resources for a Kafka >>>>> broker. This >>>>> > > can >>>>> > > > be >>>>> > > > >> > > done by a >>>>> > > > >> > > > > >> >> tool, rather than by changing Kafka. E.g., the >>>>> tool >>>>> > > would >>>>> > > > >> take a >>>>> > > > >> > > bunch >>>>> > > > >> > > > > >> of >>>>> > > > >> > > > > >> >> disks together, create a volume over them and >>>>> export >>>>> > > that >>>>> > > > >> to a >>>>> > > > >> > > Kafka >>>>> > > > >> > > > > >> broker >>>>> > > > >> > > > > >> >> (in addition to setting the memory limits for >>>>> that >>>>> > > broker >>>>> > > > or >>>>> > > > >> > > limiting >>>>> > > > >> > > > > >> other >>>>> > > > >> > > > > >> >> resources). A different bunch of disks can then >>>>> make >>>>> > up >>>>> > > a >>>>> > > > >> second >>>>> > > > >> > > > > >> volume, >>>>> > > > >> > > > > >> >> and be used by another Kafka broker. This is >>>>> aligned >>>>> > > with >>>>> > > > >> what >>>>> > > > >> > > Colin is >>>>> > > > >> > > > > >> >> saying (as I understand it). >>>>> > > > >> > > > > >> >> >>>>> > > > >> > > > > >> >> Disks are not the only resource on a machine, >>>>> there >>>>> > are >>>>> > > > >> several >>>>> > > > >> > > > > >> instances >>>>> > > > >> > > > > >> >> where multiple NICs are used for example. Do we >>>>> want >>>>> > > fine >>>>> > > > >> grained >>>>> > > > >> > > > > >> >> management of all these resources? I'd argue >>>>> that >>>>> > opens >>>>> > > us >>>>> > > > >> the >>>>> > > > >> > > system >>>>> > > > >> > > > > >> to a >>>>> > > > >> > > > > >> >> lot of complexity. >>>>> > > > >> > > > > >> >> >>>>> > > > >> > > > > >> >> Thanks >>>>> > > > >> > > > > >> >> Eno >>>>> > > > >> > > > > >> >> >>>>> > > > >> > > > > >> >> >>>>> > > > >> > > > > >> >>> On 1 Feb 2017, at 01:53, Dong Lin < >>>>> > lindon...@gmail.com >>>>> > > > >>>>> > > > >> wrote: >>>>> > > > >> > > > > >> >>> >>>>> > > > >> > > > > >> >>> Hi all, >>>>> > > > >> > > > > >> >>> >>>>> > > > >> > > > > >> >>> I am going to initiate the vote If there is no >>>>> > further >>>>> > > > >> concern >>>>> > > > >> > > with >>>>> > > > >> > > > > >> the >>>>> > > > >> > > > > >> >> KIP. >>>>> > > > >> > > > > >> >>> >>>>> > > > >> > > > > >> >>> Thanks, >>>>> > > > >> > > > > >> >>> Dong >>>>> > > > >> > > > > >> >>> >>>>> > > > >> > > > > >> >>> >>>>> > > > >> > > > > >> >>> On Fri, Jan 27, 2017 at 8:08 PM, radai < >>>>> > > > >> > > radai.rosenbl...@gmail.com> >>>>> > > > >> > > > > >> >> wrote: >>>>> > > > >> > > > > >> >>> >>>>> > > > >> > > > > >> >>>> a few extra points: >>>>> > > > >> > > > > >> >>>> >>>>> > > > >> > > > > >> >>>> 1. broker per disk might also incur more >>>>> client <--> >>>>> > > > >> broker >>>>> > > > >> > > sockets: >>>>> > > > >> > > > > >> >>>> suppose every producer / consumer "talks" to >>>>> >1 >>>>> > > > partition, >>>>> > > > >> > > there's a >>>>> > > > >> > > > > >> >> very >>>>> > > > >> > > > > >> >>>> good chance that partitions that were >>>>> co-located on >>>>> > a >>>>> > > > >> single >>>>> > > > >> > > 10-disk >>>>> > > > >> > > > > >> >> broker >>>>> > > > >> > > > > >> >>>> would now be split between several single-disk >>>>> > broker >>>>> > > > >> > > processes on >>>>> > > > >> > > > > >> the >>>>> > > > >> > > > > >> >> same >>>>> > > > >> > > > > >> >>>> machine. hard to put a multiplier on this, but >>>>> > likely >>>>> > > > >x1. >>>>> > > > >> > > sockets >>>>> > > > >> > > > > >> are a >>>>> > > > >> > > > > >> >>>> limited resource at the OS level and incur >>>>> some >>>>> > memory >>>>> > > > >> cost >>>>> > > > >> > > (kernel >>>>> > > > >> > > > > >> >>>> buffers) >>>>> > > > >> > > > > >> >>>> >>>>> > > > >> > > > > >> >>>> 2. there's a memory overhead to spinning up a >>>>> JVM >>>>> > > > >> (compiled >>>>> > > > >> > > code and >>>>> > > > >> > > > > >> >> byte >>>>> > > > >> > > > > >> >>>> code objects etc). if we assume this overhead >>>>> is >>>>> > ~300 >>>>> > > MB >>>>> > > > >> > > (order of >>>>> > > > >> > > > > >> >>>> magnitude, specifics vary) than spinning up >>>>> 10 JVMs >>>>> > > > would >>>>> > > > >> lose >>>>> > > > >> > > you 3 >>>>> > > > >> > > > > >> GB >>>>> > > > >> > > > > >> >> of >>>>> > > > >> > > > > >> >>>> RAM. not a ton, but non negligible. >>>>> > > > >> > > > > >> >>>> >>>>> > > > >> > > > > >> >>>> 3. there would also be some overhead >>>>> downstream of >>>>> > > kafka >>>>> > > > >> in any >>>>> > > > >> > > > > >> >> management >>>>> > > > >> > > > > >> >>>> / monitoring / log aggregation system. likely >>>>> less >>>>> > > than >>>>> > > > >> x10 >>>>> > > > >> > > though. >>>>> > > > >> > > > > >> >>>> >>>>> > > > >> > > > > >> >>>> 4. (related to above) - added complexity of >>>>> > > > administration >>>>> > > > >> > > with more >>>>> > > > >> > > > > >> >>>> running instances. >>>>> > > > >> > > > > >> >>>> >>>>> > > > >> > > > > >> >>>> is anyone running kafka with anywhere near >>>>> 100GB >>>>> > > heaps? >>>>> > > > i >>>>> > > > >> > > thought the >>>>> > > > >> > > > > >> >> point >>>>> > > > >> > > > > >> >>>> was to rely on kernel page cache to do the >>>>> disk >>>>> > > > buffering >>>>> > > > >> .... >>>>> > > > >> > > > > >> >>>> >>>>> > > > >> > > > > >> >>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin < >>>>> > > > >> > > lindon...@gmail.com> >>>>> > > > >> > > > > >> wrote: >>>>> > > > >> > > > > >> >>>> >>>>> > > > >> > > > > >> >>>>> Hey Colin, >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>> Thanks much for the comment. Please see me >>>>> comment >>>>> > > > >> inline. >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin >>>>> McCabe < >>>>> > > > >> > > cmcc...@apache.org> >>>>> > > > >> > > > > >> >>>> wrote: >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin >>>>> wrote: >>>>> > > > >> > > > > >> >>>>>>> Hey Colin, >>>>> > > > >> > > > > >> >>>>>>> >>>>> > > > >> > > > > >> >>>>>>> Good point! Yeah we have actually >>>>> considered and >>>>> > > > >> tested this >>>>> > > > >> > > > > >> >>>> solution, >>>>> > > > >> > > > > >> >>>>>>> which we call one-broker-per-disk. It >>>>> would work >>>>> > > and >>>>> > > > >> should >>>>> > > > >> > > > > >> require >>>>> > > > >> > > > > >> >>>> no >>>>> > > > >> > > > > >> >>>>>>> major change in Kafka as compared to this >>>>> JBOD >>>>> > KIP. >>>>> > > > So >>>>> > > > >> it >>>>> > > > >> > > would >>>>> > > > >> > > > > >> be a >>>>> > > > >> > > > > >> >>>>> good >>>>> > > > >> > > > > >> >>>>>>> short term solution. >>>>> > > > >> > > > > >> >>>>>>> >>>>> > > > >> > > > > >> >>>>>>> But it has a few drawbacks which makes it >>>>> less >>>>> > > > >> desirable in >>>>> > > > >> > > the >>>>> > > > >> > > > > >> long >>>>> > > > >> > > > > >> >>>>>>> term. >>>>> > > > >> > > > > >> >>>>>>> Assume we have 10 disks on a machine. Here >>>>> are >>>>> > the >>>>> > > > >> problems: >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>>> Hi Dong, >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>>> Thanks for the thoughtful reply. >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>>>> >>>>> > > > >> > > > > >> >>>>>>> 1) Our stress test result shows that >>>>> > > > >> one-broker-per-disk >>>>> > > > >> > > has 15% >>>>> > > > >> > > > > >> >>>> lower >>>>> > > > >> > > > > >> >>>>>>> throughput >>>>> > > > >> > > > > >> >>>>>>> >>>>> > > > >> > > > > >> >>>>>>> 2) Controller would need to send 10X as >>>>> many >>>>> > > > >> > > LeaderAndIsrRequest, >>>>> > > > >> > > > > >> >>>>>>> MetadataUpdateRequest and >>>>> StopReplicaRequest. >>>>> > This >>>>> > > > >> > > increases the >>>>> > > > >> > > > > >> >>>> burden >>>>> > > > >> > > > > >> >>>>>>> on >>>>> > > > >> > > > > >> >>>>>>> controller which can be the performance >>>>> > bottleneck. >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>>> Maybe I'm misunderstanding something, but >>>>> there >>>>> > > would >>>>> > > > >> not be >>>>> > > > >> > > 10x as >>>>> > > > >> > > > > >> >>>> many >>>>> > > > >> > > > > >> >>>>>> StopReplicaRequest RPCs, would there? The >>>>> other >>>>> > > > >> requests >>>>> > > > >> > > would >>>>> > > > >> > > > > >> >>>> increase >>>>> > > > >> > > > > >> >>>>>> 10x, but from a pretty low base, right? We >>>>> are >>>>> > not >>>>> > > > >> > > reassigning >>>>> > > > >> > > > > >> >>>>>> partitions all the time, I hope (or else we >>>>> have >>>>> > > > bigger >>>>> > > > >> > > > > >> problems...) >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>> I think the controller will group >>>>> > StopReplicaRequest >>>>> > > > per >>>>> > > > >> > > broker and >>>>> > > > >> > > > > >> >> send >>>>> > > > >> > > > > >> >>>>> only one StopReplicaRequest to a broker >>>>> during >>>>> > > > controlled >>>>> > > > >> > > shutdown. >>>>> > > > >> > > > > >> >>>> Anyway, >>>>> > > > >> > > > > >> >>>>> we don't have to worry about this if we >>>>> agree that >>>>> > > > other >>>>> > > > >> > > requests >>>>> > > > >> > > > > >> will >>>>> > > > >> > > > > >> >>>>> increase by 10X. One MetadataRequest to send >>>>> to >>>>> > each >>>>> > > > >> broker >>>>> > > > >> > > in the >>>>> > > > >> > > > > >> >>>> cluster >>>>> > > > >> > > > > >> >>>>> every time there is leadership change. I am >>>>> not >>>>> > sure >>>>> > > > >> this is >>>>> > > > >> > > a real >>>>> > > > >> > > > > >> >>>>> problem. But in theory this makes the >>>>> overhead >>>>> > > > complexity >>>>> > > > >> > > O(number >>>>> > > > >> > > > > >> of >>>>> > > > >> > > > > >> >>>>> broker) and may be a concern in the future. >>>>> Ideally >>>>> > > we >>>>> > > > >> should >>>>> > > > >> > > avoid >>>>> > > > >> > > > > >> it. >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>>>> >>>>> > > > >> > > > > >> >>>>>>> 3) Less efficient use of physical resource >>>>> on the >>>>> > > > >> machine. >>>>> > > > >> > > The >>>>> > > > >> > > > > >> number >>>>> > > > >> > > > > >> >>>>> of >>>>> > > > >> > > > > >> >>>>>>> socket on each machine will increase by >>>>> 10X. The >>>>> > > > >> number of >>>>> > > > >> > > > > >> connection >>>>> > > > >> > > > > >> >>>>>>> between any two machine will increase by >>>>> 100X. >>>>> > > > >> > > > > >> >>>>>>> >>>>> > > > >> > > > > >> >>>>>>> 4) Less efficient way to management memory >>>>> and >>>>> > > quota. >>>>> > > > >> > > > > >> >>>>>>> >>>>> > > > >> > > > > >> >>>>>>> 5) Rebalance between disks/brokers on the >>>>> same >>>>> > > > machine >>>>> > > > >> will >>>>> > > > >> > > less >>>>> > > > >> > > > > >> >>>>>>> efficient >>>>> > > > >> > > > > >> >>>>>>> and less flexible. Broker has to read data >>>>> from >>>>> > > > another >>>>> > > > >> > > broker on >>>>> > > > >> > > > > >> the >>>>> > > > >> > > > > >> >>>>>>> same >>>>> > > > >> > > > > >> >>>>>>> machine via socket. It is also harder to do >>>>> > > automatic >>>>> > > > >> load >>>>> > > > >> > > balance >>>>> > > > >> > > > > >> >>>>>>> between >>>>> > > > >> > > > > >> >>>>>>> disks on the same machine in the future. >>>>> > > > >> > > > > >> >>>>>>> >>>>> > > > >> > > > > >> >>>>>>> I will put this and the explanation in the >>>>> > rejected >>>>> > > > >> > > alternative >>>>> > > > >> > > > > >> >>>>> section. >>>>> > > > >> > > > > >> >>>>>>> I >>>>> > > > >> > > > > >> >>>>>>> have a few questions: >>>>> > > > >> > > > > >> >>>>>>> >>>>> > > > >> > > > > >> >>>>>>> - Can you explain why this solution can >>>>> help >>>>> > avoid >>>>> > > > >> > > scalability >>>>> > > > >> > > > > >> >>>>>>> bottleneck? >>>>> > > > >> > > > > >> >>>>>>> I actually think it will exacerbate the >>>>> > scalability >>>>> > > > >> problem >>>>> > > > >> > > due >>>>> > > > >> > > > > >> the >>>>> > > > >> > > > > >> >>>> 2) >>>>> > > > >> > > > > >> >>>>>>> above. >>>>> > > > >> > > > > >> >>>>>>> - Why can we push more RPC with this >>>>> solution? >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>>> To really answer this question we'd have to >>>>> take a >>>>> > > > deep >>>>> > > > >> dive >>>>> > > > >> > > into >>>>> > > > >> > > > > >> the >>>>> > > > >> > > > > >> >>>>>> locking of the broker and figure out how >>>>> > effectively >>>>> > > > it >>>>> > > > >> can >>>>> > > > >> > > > > >> >> parallelize >>>>> > > > >> > > > > >> >>>>>> truly independent requests. Almost every >>>>> > > > multithreaded >>>>> > > > >> > > process is >>>>> > > > >> > > > > >> >>>> going >>>>> > > > >> > > > > >> >>>>>> to have shared state, like shared queues or >>>>> shared >>>>> > > > >> sockets, >>>>> > > > >> > > that is >>>>> > > > >> > > > > >> >>>>>> going to make scaling less than linear when >>>>> you >>>>> > add >>>>> > > > >> disks or >>>>> > > > >> > > > > >> >>>> processors. >>>>> > > > >> > > > > >> >>>>>> (And clearly, another option is to improve >>>>> that >>>>> > > > >> scalability, >>>>> > > > >> > > rather >>>>> > > > >> > > > > >> >>>>>> than going multi-process!) >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>> Yeah I also think it is better to improve >>>>> > scalability >>>>> > > > >> inside >>>>> > > > >> > > kafka >>>>> > > > >> > > > > >> code >>>>> > > > >> > > > > >> >>>> if >>>>> > > > >> > > > > >> >>>>> possible. I am not sure we currently have any >>>>> > > > scalability >>>>> > > > >> > > issue >>>>> > > > >> > > > > >> inside >>>>> > > > >> > > > > >> >>>>> Kafka that can not be removed without using >>>>> > > > >> multi-process. >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>>>> - It is true that a garbage collection in >>>>> one >>>>> > > broker >>>>> > > > >> would >>>>> > > > >> > > not >>>>> > > > >> > > > > >> affect >>>>> > > > >> > > > > >> >>>>>>> others. But that is after every broker >>>>> only uses >>>>> > > 1/10 >>>>> > > > >> of the >>>>> > > > >> > > > > >> memory. >>>>> > > > >> > > > > >> >>>>> Can >>>>> > > > >> > > > > >> >>>>>>> we be sure that this will actually help >>>>> > > performance? >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>>> The big question is, how much memory do >>>>> Kafka >>>>> > > brokers >>>>> > > > >> use >>>>> > > > >> > > now, and >>>>> > > > >> > > > > >> how >>>>> > > > >> > > > > >> >>>>>> much will they use in the future? Our >>>>> experience >>>>> > in >>>>> > > > >> HDFS >>>>> > > > >> > > was that >>>>> > > > >> > > > > >> >> once >>>>> > > > >> > > > > >> >>>>>> you start getting more than 100-200GB Java >>>>> heap >>>>> > > sizes, >>>>> > > > >> full >>>>> > > > >> > > GCs >>>>> > > > >> > > > > >> start >>>>> > > > >> > > > > >> >>>>>> taking minutes to finish when using the >>>>> standard >>>>> > > JVMs. >>>>> > > > >> That >>>>> > > > >> > > alone >>>>> > > > >> > > > > >> is >>>>> > > > >> > > > > >> >> a >>>>> > > > >> > > > > >> >>>>>> good reason to go multi-process or consider >>>>> > storing >>>>> > > > more >>>>> > > > >> > > things off >>>>> > > > >> > > > > >> >> the >>>>> > > > >> > > > > >> >>>>>> Java heap. >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>> I see. Now I agree one-broker-per-disk >>>>> should be >>>>> > more >>>>> > > > >> > > efficient in >>>>> > > > >> > > > > >> >> terms >>>>> > > > >> > > > > >> >>>> of >>>>> > > > >> > > > > >> >>>>> GC since each broker probably needs less >>>>> than 1/10 >>>>> > of >>>>> > > > the >>>>> > > > >> > > memory >>>>> > > > >> > > > > >> >>>> available >>>>> > > > >> > > > > >> >>>>> on a typical machine nowadays. I will remove >>>>> this >>>>> > > from >>>>> > > > >> the >>>>> > > > >> > > reason of >>>>> > > > >> > > > > >> >>>>> rejection. >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>>> Disk failure is the "easy" case. The >>>>> "hard" case, >>>>> > > > >> which is >>>>> > > > >> > > > > >> >>>>>> unfortunately also the much more common >>>>> case, is >>>>> > > disk >>>>> > > > >> > > misbehavior. >>>>> > > > >> > > > > >> >>>>>> Towards the end of their lives, disks tend >>>>> to >>>>> > start >>>>> > > > >> slowing >>>>> > > > >> > > down >>>>> > > > >> > > > > >> >>>>>> unpredictably. Requests that would have >>>>> completed >>>>> > > > >> > > immediately >>>>> > > > >> > > > > >> before >>>>> > > > >> > > > > >> >>>>>> start taking 20, 100 500 milliseconds. >>>>> Some files >>>>> > > may >>>>> > > > >> be >>>>> > > > >> > > readable >>>>> > > > >> > > > > >> and >>>>> > > > >> > > > > >> >>>>>> other files may not be. System calls hang, >>>>> > > sometimes >>>>> > > > >> > > forever, and >>>>> > > > >> > > > > >> the >>>>> > > > >> > > > > >> >>>>>> Java process can't abort them, because the >>>>> hang is >>>>> > > in >>>>> > > > >> the >>>>> > > > >> > > kernel. >>>>> > > > >> > > > > >> It >>>>> > > > >> > > > > >> >>>> is >>>>> > > > >> > > > > >> >>>>>> not fun when threads are stuck in "D state" >>>>> > > > >> > > > > >> >>>>>> http://stackoverflow.com/quest >>>>> > > > >> ions/20423521/process-perminan >>>>> > > > >> > > > > >> >>>>>> tly-stuck-on-d-state >>>>> > > > >> > > > > >> >>>>>> . Even kill -9 cannot abort the thread >>>>> then. >>>>> > > > >> Fortunately, >>>>> > > > >> > > this is >>>>> > > > >> > > > > >> >>>>>> rare. >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>> I agree it is a harder problem and it is >>>>> rare. We >>>>> > > > >> probably >>>>> > > > >> > > don't >>>>> > > > >> > > > > >> have >>>>> > > > >> > > > > >> >> to >>>>> > > > >> > > > > >> >>>>> worry about it in this KIP since this issue >>>>> is >>>>> > > > >> orthogonal to >>>>> > > > >> > > > > >> whether or >>>>> > > > >> > > > > >> >>>> not >>>>> > > > >> > > > > >> >>>>> we use JBOD. >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>>> Another approach we should consider is for >>>>> Kafka >>>>> > to >>>>> > > > >> > > implement its >>>>> > > > >> > > > > >> own >>>>> > > > >> > > > > >> >>>>>> storage layer that would stripe across >>>>> multiple >>>>> > > disks. >>>>> > > > >> This >>>>> > > > >> > > > > >> wouldn't >>>>> > > > >> > > > > >> >>>>>> have to be done at the block level, but >>>>> could be >>>>> > > done >>>>> > > > >> at the >>>>> > > > >> > > file >>>>> > > > >> > > > > >> >>>> level. >>>>> > > > >> > > > > >> >>>>>> We could use consistent hashing to >>>>> determine which >>>>> > > > >> disks a >>>>> > > > >> > > file >>>>> > > > >> > > > > >> should >>>>> > > > >> > > > > >> >>>>>> end up on, for example. >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>> Are you suggesting that we should distribute >>>>> log, >>>>> > or >>>>> > > > log >>>>> > > > >> > > segment, >>>>> > > > >> > > > > >> >> across >>>>> > > > >> > > > > >> >>>>> disks of brokers? I am not sure if I fully >>>>> > understand >>>>> > > > >> this >>>>> > > > >> > > > > >> approach. My >>>>> > > > >> > > > > >> >>>> gut >>>>> > > > >> > > > > >> >>>>> feel is that this would be a drastic >>>>> solution that >>>>> > > > would >>>>> > > > >> > > require >>>>> > > > >> > > > > >> >>>>> non-trivial design. While this may be useful >>>>> to >>>>> > > Kafka, >>>>> > > > I >>>>> > > > >> would >>>>> > > > >> > > > > >> prefer >>>>> > > > >> > > > > >> >> not >>>>> > > > >> > > > > >> >>>>> to discuss this in detail in this thread >>>>> unless you >>>>> > > > >> believe >>>>> > > > >> > > it is >>>>> > > > >> > > > > >> >>>> strictly >>>>> > > > >> > > > > >> >>>>> superior to the design in this KIP in terms >>>>> of >>>>> > > solving >>>>> > > > >> our >>>>> > > > >> > > use-case. >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>>>> best, >>>>> > > > >> > > > > >> >>>>>> Colin >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>>>> >>>>> > > > >> > > > > >> >>>>>>> Thanks, >>>>> > > > >> > > > > >> >>>>>>> Dong >>>>> > > > >> > > > > >> >>>>>>> >>>>> > > > >> > > > > >> >>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin >>>>> McCabe < >>>>> > > > >> > > > > >> cmcc...@apache.org> >>>>> > > > >> > > > > >> >>>>>>> wrote: >>>>> > > > >> > > > > >> >>>>>>> >>>>> > > > >> > > > > >> >>>>>>>> Hi Dong, >>>>> > > > >> > > > > >> >>>>>>>> >>>>> > > > >> > > > > >> >>>>>>>> Thanks for the writeup! It's very >>>>> interesting. >>>>> > > > >> > > > > >> >>>>>>>> >>>>> > > > >> > > > > >> >>>>>>>> I apologize in advance if this has been >>>>> > discussed >>>>> > > > >> > > somewhere else. >>>>> > > > >> > > > > >> >>>>> But >>>>> > > > >> > > > > >> >>>>>> I >>>>> > > > >> > > > > >> >>>>>>>> am curious if you have considered the >>>>> solution >>>>> > of >>>>> > > > >> running >>>>> > > > >> > > > > >> multiple >>>>> > > > >> > > > > >> >>>>>>>> brokers per node. Clearly there is a >>>>> memory >>>>> > > > overhead >>>>> > > > >> with >>>>> > > > >> > > this >>>>> > > > >> > > > > >> >>>>>> solution >>>>> > > > >> > > > > >> >>>>>>>> because of the fixed cost of starting >>>>> multiple >>>>> > > JVMs. >>>>> > > > >> > > However, >>>>> > > > >> > > > > >> >>>>> running >>>>> > > > >> > > > > >> >>>>>>>> multiple JVMs would help avoid scalability >>>>> > > > >> bottlenecks. >>>>> > > > >> > > You >>>>> > > > >> > > > > >> could >>>>> > > > >> > > > > >> >>>>>>>> probably push more RPCs per second, for >>>>> example. >>>>> > > A >>>>> > > > >> garbage >>>>> > > > >> > > > > >> >>>>> collection >>>>> > > > >> > > > > >> >>>>>>>> in one broker would not affect the >>>>> others. It >>>>> > > would >>>>> > > > >> be >>>>> > > > >> > > > > >> interesting >>>>> > > > >> > > > > >> >>>>> to >>>>> > > > >> > > > > >> >>>>>>>> see this considered in the "alternate >>>>> designs" >>>>> > > > design, >>>>> > > > >> > > even if >>>>> > > > >> > > > > >> you >>>>> > > > >> > > > > >> >>>>> end >>>>> > > > >> > > > > >> >>>>>>>> up deciding it's not the way to go. >>>>> > > > >> > > > > >> >>>>>>>> >>>>> > > > >> > > > > >> >>>>>>>> best, >>>>> > > > >> > > > > >> >>>>>>>> Colin >>>>> > > > >> > > > > >> >>>>>>>> >>>>> > > > >> > > > > >> >>>>>>>> >>>>> > > > >> > > > > >> >>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin >>>>> wrote: >>>>> > > > >> > > > > >> >>>>>>>>> Hi all, >>>>> > > > >> > > > > >> >>>>>>>>> >>>>> > > > >> > > > > >> >>>>>>>>> We created KIP-112: Handle disk failure >>>>> for >>>>> > JBOD. >>>>> > > > >> Please >>>>> > > > >> > > find >>>>> > > > >> > > > > >> the >>>>> > > > >> > > > > >> >>>>> KIP >>>>> > > > >> > > > > >> >>>>>>>>> wiki >>>>> > > > >> > > > > >> >>>>>>>>> in the link >>>>> https://cwiki.apache.org/confl >>>>> > > > >> > > > > >> >>>> uence/display/KAFKA/KIP- >>>>> > > > >> > > > > >> >>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD. >>>>> > > > >> > > > > >> >>>>>>>>> >>>>> > > > >> > > > > >> >>>>>>>>> This KIP is related to KIP-113 >>>>> > > > >> > > > > >> >>>>>>>>> <https://cwiki.apache.org/conf >>>>> > > > >> luence/display/KAFKA/KIP- >>>>> > > > >> > > > > >> >>>>>>>> 113%3A+Support+replicas+moveme >>>>> > > > >> nt+between+log+directories>: >>>>> > > > >> > > > > >> >>>>>>>>> Support replicas movement between log >>>>> > > directories. >>>>> > > > >> They >>>>> > > > >> > > are >>>>> > > > >> > > > > >> >>>> needed >>>>> > > > >> > > > > >> >>>>> in >>>>> > > > >> > > > > >> >>>>>>>>> order >>>>> > > > >> > > > > >> >>>>>>>>> to support JBOD in Kafka. Please help >>>>> review >>>>> > the >>>>> > > > >> KIP. You >>>>> > > > >> > > > > >> >>>> feedback >>>>> > > > >> > > > > >> >>>>> is >>>>> > > > >> > > > > >> >>>>>>>>> appreciated! >>>>> > > > >> > > > > >> >>>>>>>>> >>>>> > > > >> > > > > >> >>>>>>>>> Thanks, >>>>> > > > >> > > > > >> >>>>>>>>> Dong >>>>> > > > >> > > > > >> >>>>>>>> >>>>> > > > >> > > > > >> >>>>>> >>>>> > > > >> > > > > >> >>>>> >>>>> > > > >> > > > > >> >>>> >>>>> > > > >> > > > > >> >> >>>>> > > > >> > > > > >> >> >>>>> > > > >> > > > > >> >>>>> > > > >> > > > > >> >>>>> > > > >> > > > > > >>>>> > > > >> > > >>>>> > > > >> >>>>> > > > > >>>>> > > > > >>>>> > > > >>>>> > > >>>>> > >>>>> >>>> >>>> >>> >> >