And the test plan has also been updated to simulate disk failure by changing log directory permission to 000.
On Mon, Feb 13, 2017 at 5:50 PM, Dong Lin <lindon...@gmail.com> wrote: > Hi Jun, > > Thanks for the reply. These comments are very helpful. Let me answer them > inline. > > > On Mon, Feb 13, 2017 at 3:25 PM, Jun Rao <j...@confluent.io> wrote: > >> Hi, Dong, >> >> Thanks for the reply. A few more replies and new comments below. >> >> On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin <lindon...@gmail.com> wrote: >> >> > Hi Jun, >> > >> > Thanks for the detailed comments. Please see answers inline: >> > >> > On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao <j...@confluent.io> wrote: >> > >> > > Hi, Dong, >> > > >> > > Thanks for the updated wiki. A few comments below. >> > > >> > > 1. Topics get created >> > > 1.1 Instead of storing successfully created replicas in ZK, could we >> > store >> > > unsuccessfully created replicas in ZK? Since the latter is less >> common, >> > it >> > > probably reduces the load on ZK. >> > > >> > >> > We can store unsuccessfully created replicas in ZK. But I am not sure if >> > that can reduce write load on ZK. >> > >> > If we want to reduce write load on ZK using by store unsuccessfully >> created >> > replicas in ZK, then broker should not write to ZK if all replicas are >> > successfully created. It means that if /broker/topics/[topic]/partiti >> > ons/[partitionId]/controller_managed_state doesn't exist in ZK for a >> given >> > partition, we have to assume all replicas of this partition have been >> > successfully created and send LeaderAndIsrRequest with create = false. >> This >> > becomes a problem if controller crashes before receiving >> > LeaderAndIsrResponse to validate whether a replica has been created. >> > >> > I think this approach and reduce the number of bytes stored in ZK. But >> I am >> > not sure if this is a concern. >> > >> > >> > >> I was mostly concerned about the controller failover time. Currently, the >> controller failover is likely dominated by the cost of reading >> topic/partition level information from ZK. If we add another partition >> level path in ZK, it probably will double the controller failover time. If >> the approach of representing the non-created replicas doesn't work, have >> you considered just adding the created flag in the leaderAndIsr path in >> ZK? >> >> > Yes, I have considered adding the created flag in the leaderAndIsr path in > ZK. If we were to add created flag per replica in the LeaderAndIsrRequest, > then it requires a lot of change in the code base. > > If we don't add created flag per replica in the LeaderAndIsrRequest, then > the information in leaderAndIsr path in ZK and LeaderAndIsrRequest would be > different. Further, the procedure for broker to update ISR in ZK will be a > bit complicated. When leader updates leaderAndIsr path in ZK, it will have > to first read created flags from ZK, change isr, and write leaderAndIsr > back to ZK. And it needs to check znode version and re-try write operation > in ZK if controller has updated ZK during this period. This is in contrast > to the current implementation where the leader either gets all the > information from LeaderAndIsrRequest sent by controller, or determine the > infromation by itself (e.g. ISR), before writing to leaderAndIsr path in ZK. > > It seems to me that the above solution is a bit complicated and not clean. > Thus I come up with the design in this KIP to store this created flag in a > separate zk path. The path is named controller_managed_state to indicate > that we can store in this znode all information that is managed by > controller only, as opposed to ISR. > > I agree with your concern of increased ZK read time during controller > failover. How about we store the "created" information in the > znode /brokers/topics/[topic]? We can change that znode to have the > following data format: > > { > "version" : 2, > "created" : { > "1" : [1, 2, 3], > ... > } > "partition" : { > "1" : [1, 2, 3], > ... > } > } > > We won't have extra zk read using this solution. It also seems reasonable > to put the partition assignment information together with replica creation > information. The latter is only changed once after the partition is created > or re-assigned. > > >> >> >> > >> > > 1.2 If an error is received for a follower, does the controller >> eagerly >> > > remove it from ISR or do we just let the leader removes it after >> timeout? >> > > >> > >> > No, Controller will not actively remove it from ISR. But controller will >> > recognize it as offline replica and propagate this information to all >> > brokers via UpdateMetadataRequest. Each leader can use this information >> to >> > actively remove offline replica from ISR set. I have updated to wiki to >> > clarify it. >> > >> > >> >> That seems inconsistent with how the controller deals with offline >> replicas >> due to broker failures. When that happens, the broker will (1) select a >> new >> leader if the offline replica is the leader; (2) remove the replica from >> ISR if the offline replica is the follower. So, intuitively, it seems that >> we should be doing the same thing when dealing with offline replicas due >> to >> disk failure. >> > > My bad. I misunderstand how the controller currently handles broker > failure and ISR change. Yes we should do the same thing when dealing with > offline replicas here. I have updated the KIP to specify that, when an > offline replica is discovered by controller, the controller removes offline > replicas from ISR in the ZK and sends LeaderAndIsrRequest with updated ISR > to be used by partition leaders. > > >> >> >> >> > >> > > 1.3 Similar, if an error is received for a leader, should the >> controller >> > > trigger leader election again? >> > > >> > >> > Yes, controller will trigger leader election if leader replica is >> offline. >> > I have updated the wiki to clarify it. >> > >> > >> > > >> > > 2. A log directory stops working on a broker during runtime: >> > > 2.1 It seems the broker remembers the failed directory after hitting >> an >> > > IOException and the failed directory won't be used for creating new >> > > partitions until the broker is restarted? If so, could you add that to >> > the >> > > wiki. >> > > >> > >> > Right, broker assumes a log directory to be good after it starts, and >> mark >> > log directory as bad once there is IOException when broker attempts to >> > access the log directory. New replicas will only be created on good log >> > directory. I just added this to the KIP. >> > >> > >> > > 2.2 Could you be a bit more specific on how and during which operation >> > the >> > > broker detects directory failure? Is it when the broker hits an >> > IOException >> > > during writes, or both reads and writes? For example, during broker >> > > startup, it only reads from each of the log directories, if it hits an >> > > IOException there, does the broker immediately mark the directory as >> > > offline? >> > > >> > >> > Broker marks log directory as bad once there is IOException when broker >> > attempts to access the log directory. This includes read and write. >> These >> > operations include log append, log read, log cleaning, watermark >> checkpoint >> > etc. If broker hits IOException when it reads from each of the log >> > directory during startup, it immediately mark the directory as offline. >> > >> > I just updated the KIP to clarify it. >> > >> > >> > > 3. Partition reassignment: If we know a replica is offline, do we >> still >> > > want to send StopReplicaRequest to it? >> > > >> > >> > No, controller doesn't send StopReplicaRequest for an offline replica. >> > Controller treats this scenario in the same way that exiting Kafka >> > implementation does when the broker of this replica is offline. >> > >> > >> > > >> > > 4. UpdateMetadataRequestPartitionState: For offline_replicas, do they >> > only >> > > include offline replicas due to log directory failures or do they also >> > > include offline replicas due to broker failure? >> > > >> > >> > UpdateMetadataRequestPartitionState's offline_replicas include offline >> > replicas due to both log directory failure and broker failure. This is >> to >> > make the semantics of this field easier to understand. Broker can >> > distinguish whether a replica is offline due to broker failure or disk >> > failure by checking whether a broker is live in the >> UpdateMetadataRequest. >> > >> > >> > > >> > > 5. Tools: Could we add some kind of support in the tool to list >> offline >> > > directories? >> > > >> > >> > In KIP-112 we don't have tools to list offline directories because we >> have >> > intentionally avoided exposing log directory information (e.g. log >> > directory path) to user or other brokers. I think we can add this >> feature >> > in KIP-113, in which we will have DescribeDirsRequest to list log >> directory >> > information (e.g. partition assignment, path, size) needed for >> rebalance. >> > >> > >> Since we are introducing a new failure mode, if a replica becomes offline >> due to failure in log directories, the first thing an admin wants to know >> is which log directories are offline from the broker's perspective. So, >> including such a tool will be useful. Do you plan to do KIP-112 and >> KIP-113 >> in the same release? >> >> > Yes, I agree that including such a tool is using. This is probably better > to be added in KIP-113 because we need DescribeDirsRequest to get this > information. I will update KIP-113 to include this tool. > > I plan to do KIP-112 and KIP-113 separately to make each KIP and their > patch easier to review. I don't have any plan about which release to have > these KIPs. My plan is to both of them ASAP. Is there particular timeline > you prefer for code of these two KIPs to checked-in? > > >> > >> > > >> > > 6. Metrics: Could we add some metrics to show offline directories? >> > > >> > >> > Sure. I think it makes sense to have each broker report its number of >> > offline replicas and offline log directories. The previous metric was >> put >> > in KIP-113. I just added both metrics in KIP-112. >> > >> > >> > > >> > > 7. There are still references to kafka-log-dirs.sh. Are they valid? >> > > >> > >> > My bad. I just removed this from "Changes in Operational Procedures" and >> > "Test Plan" in the KIP. >> > >> > >> > > >> > > 8. Do you think KIP-113 is ready for review? One thing that KIP-113 >> > > mentions during partition reassignment is to first send >> > > LeaderAndIsrRequest, followed by ChangeReplicaDirRequest. It seems >> it's >> > > better if the replicas are created in the right log directory in the >> > first >> > > place? The reason that I brought it up here is because it may affect >> the >> > > protocol of LeaderAndIsrRequest. >> > > >> > >> > Yes, KIP-113 is ready for review. The advantage of the current design is >> > that we can keep LeaderAndIsrRequest log-direcotry-agnostic. The >> > implementation would be much easier to read if all log related logic >> (e.g. >> > various errors) are put in ChangeReplicadIRrequest and the code path of >> > handling replica movement is separated from leadership handling. >> > >> > In other words, I think Kafka may be easier to develop in the long term >> if >> > we separate these two requests. >> > >> > I agree that ideally we want to create replicas in the right log >> directory >> > in the first place. But I am not sure if there is any performance or >> > correctness concern with the existing way of moving it after it is >> created. >> > Besides, does this decision affect the change proposed in KIP-112? >> > >> > >> I am just wondering if you have considered including the log directory for >> the replicas in the LeaderAndIsrRequest. >> >> > Yeah I have thought about this idea, but only briefly. I rejected this > idea because log directory is broker's local information and I prefer not > to expose local config information to the cluster through > LeaderAndIsrRequest. > > >> 9. Could you describe when the offline replicas due to log directory >> failure are removed from the replica fetch threads? >> > > Yes. If the offline replica was a leader, either a new leader is elected > or all follower brokers will stop fetching for this partition. If the > offline replica is a follower, the broker will stop fetching for this > replica immediately. A broker stops fetching data for a replica by removing > the replica from the replica fetch threads. I have updated the KIP to > clarify it. > > >> >> 10. The wiki mentioned changing the log directory to a file for simulating >> disk failure in system tests. Could we just change the permission of the >> log directory to 000 to simulate that? >> > > > Sure, > > >> >> Thanks, >> >> Jun >> >> >> > > Jun >> > > >> > > On Fri, Feb 10, 2017 at 9:53 AM, Dong Lin <lindon...@gmail.com> >> wrote: >> > > >> > > > Hi Jun, >> > > > >> > > > Can I replace zookeeper access with direct RPC for both ISR >> > notification >> > > > and disk failure notification in a future KIP, or do you feel we >> should >> > > do >> > > > it in this KIP? >> > > > >> > > > Hi Eno, Grant and everyone, >> > > > >> > > > Is there further improvement you would like to see with this KIP? >> > > > >> > > > Thanks you all for the comments, >> > > > >> > > > Dong >> > > > >> > > > >> > > > >> > > > On Thu, Feb 9, 2017 at 4:45 PM, Dong Lin <lindon...@gmail.com> >> wrote: >> > > > >> > > > > >> > > > > >> > > > > On Thu, Feb 9, 2017 at 3:37 PM, Colin McCabe <cmcc...@apache.org> >> > > wrote: >> > > > > >> > > > >> On Thu, Feb 9, 2017, at 11:40, Dong Lin wrote: >> > > > >> > Thanks for all the comments Colin! >> > > > >> > >> > > > >> > To answer your questions: >> > > > >> > - Yes, a broker will shutdown if all its log directories are >> bad. >> > > > >> >> > > > >> That makes sense. Can you add this to the writeup? >> > > > >> >> > > > > >> > > > > Sure. This has already been added. You can find it here >> > > > > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio >> n.action >> > ? >> > > > pageId=67638402&selectedPageVersions=9&selectedPageVersions=10> >> > > > > . >> > > > > >> > > > > >> > > > >> >> > > > >> > - I updated the KIP to explicitly state that a log directory >> will >> > be >> > > > >> > assumed to be good until broker sees IOException when it tries >> to >> > > > access >> > > > >> > the log directory. >> > > > >> >> > > > >> Thanks. >> > > > >> >> > > > >> > - Controller doesn't explicitly know whether there is new log >> > > > directory >> > > > >> > or >> > > > >> > not. All controller knows is whether replicas are online or >> > offline >> > > > >> based >> > > > >> > on LeaderAndIsrResponse. According to the existing Kafka >> > > > implementation, >> > > > >> > controller will always send LeaderAndIsrRequest to a broker >> after >> > it >> > > > >> > bounces. >> > > > >> >> > > > >> I thought so. It's good to clarify, though. Do you think it's >> > worth >> > > > >> adding a quick discussion of this on the wiki? >> > > > >> >> > > > > >> > > > > Personally I don't think it is needed. If broker starts with no >> bad >> > log >> > > > > directory, everything should work it is and we should not need to >> > > clarify >> > > > > it. The KIP has already covered the scenario when a broker starts >> > with >> > > > bad >> > > > > log directory. Also, the KIP doesn't claim or hint that we support >> > > > dynamic >> > > > > addition of new log directories. I think we are good. >> > > > > >> > > > > >> > > > >> best, >> > > > >> Colin >> > > > >> >> > > > >> > >> > > > >> > Please see this >> > > > >> > <https://cwiki.apache.org/confluence/pages/diffpagesbyversio >> > > > >> n.action?pageId=67638402&selectedPageVersions=9& >> > > > selectedPageVersions=10> >> > > > >> > for the change of the KIP. >> > > > >> > >> > > > >> > On Thu, Feb 9, 2017 at 11:04 AM, Colin McCabe < >> cmcc...@apache.org >> > > >> > > > >> wrote: >> > > > >> > >> > > > >> > > On Thu, Feb 9, 2017, at 11:03, Colin McCabe wrote: >> > > > >> > > > Thanks, Dong L. >> > > > >> > > > >> > > > >> > > > Do we plan on bringing down the broker process when all log >> > > > >> directories >> > > > >> > > > are offline? >> > > > >> > > > >> > > > >> > > > Can you explicitly state on the KIP that the log dirs are >> all >> > > > >> considered >> > > > >> > > > good after the broker process is bounced? It seems like an >> > > > >> important >> > > > >> > > > thing to be clear about. Also, perhaps discuss how the >> > > controller >> > > > >> > > > becomes aware of the newly good log directories after a >> broker >> > > > >> bounce >> > > > >> > > > (and whether this triggers re-election). >> > > > >> > > >> > > > >> > > I meant to write, all the log dirs where the broker can still >> > read >> > > > the >> > > > >> > > index and some other files. Clearly, log dirs that are >> > completely >> > > > >> > > inaccessible will still be considered bad after a broker >> process >> > > > >> bounce. >> > > > >> > > >> > > > >> > > best, >> > > > >> > > Colin >> > > > >> > > >> > > > >> > > > >> > > > >> > > > +1 (non-binding) aside from that >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > On Wed, Feb 8, 2017, at 00:47, Dong Lin wrote: >> > > > >> > > > > Hi all, >> > > > >> > > > > >> > > > >> > > > > Thank you all for the helpful suggestion. I have updated >> the >> > > KIP >> > > > >> to >> > > > >> > > > > address >> > > > >> > > > > the comments received so far. See here >> > > > >> > > > > <https://cwiki.apache.org/conf >> > luence/pages/diffpagesbyversio >> > > > >> n.action? >> > > > >> > > pageId=67638402&selectedPageVersions=8&selectedPageVersions= >> > 9>to >> > > > >> > > > > read the changes of the KIP. Here is a summary of change: >> > > > >> > > > > >> > > > >> > > > > - Updated the Proposed Change section to change the >> recovery >> > > > >> steps. >> > > > >> > > After >> > > > >> > > > > this change, broker will also create replica as long as >> all >> > > log >> > > > >> > > > > directories >> > > > >> > > > > are working. >> > > > >> > > > > - Removed kafka-log-dirs.sh from this KIP since user no >> > longer >> > > > >> needs to >> > > > >> > > > > use >> > > > >> > > > > it for recovery from bad disks. >> > > > >> > > > > - Explained how the znode controller_managed_state is >> > managed >> > > in >> > > > >> the >> > > > >> > > > > Public >> > > > >> > > > > interface section. >> > > > >> > > > > - Explained what happens during controller failover, >> > partition >> > > > >> > > > > reassignment >> > > > >> > > > > and topic deletion in the Proposed Change section. >> > > > >> > > > > - Updated Future Work section to include the following >> > > potential >> > > > >> > > > > improvements >> > > > >> > > > > - Let broker notify controller of ISR change and disk >> > state >> > > > >> change >> > > > >> > > via >> > > > >> > > > > RPC instead of using zookeeper >> > > > >> > > > > - Handle various failure scenarios (e.g. slow disk) on >> a >> > > > >> case-by-case >> > > > >> > > > > basis. For example, we may want to detect slow disk and >> > > consider >> > > > >> it as >> > > > >> > > > > offline. >> > > > >> > > > > - Allow admin to mark a directory as bad so that it >> will >> > not >> > > > be >> > > > >> used. >> > > > >> > > > > >> > > > >> > > > > Thanks, >> > > > >> > > > > Dong >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > On Tue, Feb 7, 2017 at 5:23 PM, Dong Lin < >> > lindon...@gmail.com >> > > > >> > > > >> wrote: >> > > > >> > > > > >> > > > >> > > > > > Hey Eno, >> > > > >> > > > > > >> > > > >> > > > > > Thanks much for the comment! >> > > > >> > > > > > >> > > > >> > > > > > I still think the complexity added to Kafka is >> justified >> > by >> > > > its >> > > > >> > > benefit. >> > > > >> > > > > > Let me provide my reasons below. >> > > > >> > > > > > >> > > > >> > > > > > 1) The additional logic is easy to understand and thus >> its >> > > > >> complexity >> > > > >> > > > > > should be reasonable. >> > > > >> > > > > > >> > > > >> > > > > > On the broker side, it needs to catch exception when >> > access >> > > > log >> > > > >> > > directory, >> > > > >> > > > > > mark log directory and all its replicas as offline, >> notify >> > > > >> > > controller by >> > > > >> > > > > > writing the zookeeper notification path, and specify >> error >> > > in >> > > > >> > > > > > LeaderAndIsrResponse. On the controller side, it will >> > > listener >> > > > >> to >> > > > >> > > > > > zookeeper for disk failure notification, learn about >> > offline >> > > > >> > > replicas in >> > > > >> > > > > > the LeaderAndIsrResponse, and take offline replicas >> into >> > > > >> > > consideration when >> > > > >> > > > > > electing leaders. It also mark replica as created in >> > > zookeeper >> > > > >> and >> > > > >> > > use it >> > > > >> > > > > > to determine whether a replica is created. >> > > > >> > > > > > >> > > > >> > > > > > That is all the logic we need to add in Kafka. I >> > personally >> > > > feel >> > > > >> > > this is >> > > > >> > > > > > easy to reason about. >> > > > >> > > > > > >> > > > >> > > > > > 2) The additional code is not much. >> > > > >> > > > > > >> > > > >> > > > > > I expect the code for KIP-112 to be around 1100 lines >> new >> > > > code. >> > > > >> > > Previously >> > > > >> > > > > > I have implemented a prototype of a slightly different >> > > design >> > > > >> (see >> > > > >> > > here >> > > > >> > > > > > <https://docs.google.com/docum >> ent/d/1Izza0SBmZMVUBUt9s_ >> > > > >> > > -Dqi3D8e0KGJQYW8xgEdRsgAI/edit>) >> > > > >> > > > > > and uploaded it to github (see here >> > > > >> > > > > > <https://github.com/lindong28/kafka/tree/JBOD>). The >> > patch >> > > > >> changed >> > > > >> > > 33 >> > > > >> > > > > > files, added 1185 lines and deleted 183 lines. The >> size of >> > > > >> prototype >> > > > >> > > patch >> > > > >> > > > > > is actually smaller than patch of KIP-107 (see here >> > > > >> > > > > > <https://github.com/apache/kafka/pull/2476>) which is >> > > already >> > > > >> > > accepted. >> > > > >> > > > > > The KIP-107 patch changed 49 files, added 1349 lines >> and >> > > > >> deleted 141 >> > > > >> > > lines. >> > > > >> > > > > > >> > > > >> > > > > > 3) Comparison with one-broker-per-multiple-volumes >> > > > >> > > > > > >> > > > >> > > > > > This KIP can improve the availability of Kafka in this >> > case >> > > > such >> > > > >> > > that one >> > > > >> > > > > > failed volume doesn't bring down the entire broker. >> > > > >> > > > > > >> > > > >> > > > > > 4) Comparison with one-broker-per-volume >> > > > >> > > > > > >> > > > >> > > > > > If each volume maps to multiple disks, then we still >> have >> > > > >> similar >> > > > >> > > problem >> > > > >> > > > > > such that the broker will fail if any disk of the >> volume >> > > > failed. >> > > > >> > > > > > >> > > > >> > > > > > If each volume maps to one disk, it means that we need >> to >> > > > >> deploy 10 >> > > > >> > > > > > brokers on a machine if the machine has 10 disks. I >> will >> > > > >> explain the >> > > > >> > > > > > concern with this approach in order of their >> importance. >> > > > >> > > > > > >> > > > >> > > > > > - It is weird if we were to tell kafka user to deploy >> 50 >> > > > >> brokers on a >> > > > >> > > > > > machine of 50 disks. >> > > > >> > > > > > >> > > > >> > > > > > - Either when user deploys Kafka on a commercial cloud >> > > > platform >> > > > >> or >> > > > >> > > when >> > > > >> > > > > > user deploys their own cluster, the size or largest >> disk >> > is >> > > > >> usually >> > > > >> > > > > > limited. There will be scenarios where user want to >> > increase >> > > > >> broker >> > > > >> > > > > > capacity by having multiple disks per broker. This JBOD >> > KIP >> > > > >> makes it >> > > > >> > > > > > feasible without hurting availability due to single >> disk >> > > > >> failure. >> > > > >> > > > > > >> > > > >> > > > > > - Automatic load rebalance across disks will be easier >> and >> > > > more >> > > > >> > > flexible >> > > > >> > > > > > if one broker has multiple disks. This can be future >> work. >> > > > >> > > > > > >> > > > >> > > > > > - There is performance concern when you deploy 10 >> broker >> > > vs. 1 >> > > > >> > > broker on >> > > > >> > > > > > one machine. The metadata the cluster, including >> > > FetchRequest, >> > > > >> > > > > > ProduceResponse, MetadataRequest and so on will all be >> 10X >> > > > >> more. The >> > > > >> > > > > > packet-per-second will be 10X higher which may limit >> > > > >> performance if >> > > > >> > > pps is >> > > > >> > > > > > the performance bottleneck. The number of socket on the >> > > > machine >> > > > >> is >> > > > >> > > 10X >> > > > >> > > > > > higher. And the number of replication thread will be >> 100X >> > > > more. >> > > > >> The >> > > > >> > > impact >> > > > >> > > > > > will be more significant with increasing number of >> disks >> > per >> > > > >> > > machine. Thus >> > > > >> > > > > > it will limit Kakfa's scalability in the long term. >> > > > >> > > > > > >> > > > >> > > > > > Thanks, >> > > > >> > > > > > Dong >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > On Tue, Feb 7, 2017 at 1:51 AM, Eno Thereska < >> > > > >> eno.there...@gmail.com >> > > > >> > > > >> > > > >> > > > > > wrote: >> > > > >> > > > > > >> > > > >> > > > > >> Hi Dong, >> > > > >> > > > > >> >> > > > >> > > > > >> To simplify the discussion today, on my part I'll zoom >> > into >> > > > one >> > > > >> > > thing >> > > > >> > > > > >> only: >> > > > >> > > > > >> >> > > > >> > > > > >> - I'll discuss the options called below : >> > > > >> "one-broker-per-disk" or >> > > > >> > > > > >> "one-broker-per-few-disks". >> > > > >> > > > > >> >> > > > >> > > > > >> - I completely buy the JBOD vs RAID arguments so >> there is >> > > no >> > > > >> need to >> > > > >> > > > > >> discuss that part for me. I buy it that JBODs are >> good. >> > > > >> > > > > >> >> > > > >> > > > > >> I find the terminology can be improved a bit. Ideally >> > we'd >> > > be >> > > > >> > > talking >> > > > >> > > > > >> about volumes, not disks. Just to make it clear that >> > Kafka >> > > > >> > > understand >> > > > >> > > > > >> volumes/directories, not individual raw disks. So by >> > > > >> > > > > >> "one-broker-per-few-disks" what I mean is that the >> admin >> > > can >> > > > >> pool a >> > > > >> > > few >> > > > >> > > > > >> disks together to create a volume/directory and give >> that >> > > to >> > > > >> Kafka. >> > > > >> > > > > >> >> > > > >> > > > > >> >> > > > >> > > > > >> The kernel of my question will be that the admin >> already >> > > has >> > > > >> tools >> > > > >> > > to 1) >> > > > >> > > > > >> create volumes/directories from a JBOD and 2) start a >> > > broker >> > > > >> on a >> > > > >> > > desired >> > > > >> > > > > >> machine and 3) assign a broker resources like a >> > directory. >> > > I >> > > > >> claim >> > > > >> > > that >> > > > >> > > > > >> those tools are sufficient to optimise resource >> > allocation. >> > > > I >> > > > >> > > understand >> > > > >> > > > > >> that a broker could manage point 3) itself, ie juggle >> the >> > > > >> > > directories. My >> > > > >> > > > > >> question is whether the complexity added to Kafka is >> > > > justified. >> > > > >> > > > > >> Operationally it seems to me an admin will still have >> to >> > do >> > > > >> all the >> > > > >> > > three >> > > > >> > > > > >> items above. >> > > > >> > > > > >> >> > > > >> > > > > >> Looking forward to the discussion >> > > > >> > > > > >> Thanks >> > > > >> > > > > >> Eno >> > > > >> > > > > >> >> > > > >> > > > > >> >> > > > >> > > > > >> > On 1 Feb 2017, at 17:21, Dong Lin < >> lindon...@gmail.com >> > > >> > > > >> wrote: >> > > > >> > > > > >> > >> > > > >> > > > > >> > Hey Eno, >> > > > >> > > > > >> > >> > > > >> > > > > >> > Thanks much for the review. >> > > > >> > > > > >> > >> > > > >> > > > > >> > I think your suggestion is to split disks of a >> machine >> > > into >> > > > >> > > multiple >> > > > >> > > > > >> disk >> > > > >> > > > > >> > sets and run one broker per disk set. Yeah this is >> > > similar >> > > > to >> > > > >> > > Colin's >> > > > >> > > > > >> > suggestion of one-broker-per-disk, which we have >> > > evaluated >> > > > at >> > > > >> > > LinkedIn >> > > > >> > > > > >> and >> > > > >> > > > > >> > considered it to be a good short term approach. >> > > > >> > > > > >> > >> > > > >> > > > > >> > As of now I don't think any of these approach is a >> > better >> > > > >> > > alternative in >> > > > >> > > > > >> > the long term. I will summarize these here. I have >> put >> > > > these >> > > > >> > > reasons in >> > > > >> > > > > >> the >> > > > >> > > > > >> > KIP's motivation section and rejected alternative >> > > section. >> > > > I >> > > > >> am >> > > > >> > > happy to >> > > > >> > > > > >> > discuss more and I would certainly like to use an >> > > > alternative >> > > > >> > > solution >> > > > >> > > > > >> that >> > > > >> > > > > >> > is easier to do with better performance. >> > > > >> > > > > >> > >> > > > >> > > > > >> > - JBOD vs. RAID-10: if we switch from RAID-10 with >> > > > >> > > > > >> replication-factoer=2 to >> > > > >> > > > > >> > JBOD with replicatio-factor=3, we get 25% reduction >> in >> > > disk >> > > > >> usage >> > > > >> > > and >> > > > >> > > > > >> > doubles the tolerance of broker failure before data >> > > > >> > > unavailability from >> > > > >> > > > > >> 1 >> > > > >> > > > > >> > to 2. This is pretty huge gain for any company that >> > uses >> > > > >> Kafka at >> > > > >> > > large >> > > > >> > > > > >> > scale. >> > > > >> > > > > >> > >> > > > >> > > > > >> > - JBOD vs. one-broker-per-disk: The benefit of >> > > > >> > > one-broker-per-disk is >> > > > >> > > > > >> that >> > > > >> > > > > >> > no major code change is needed in Kafka. Among the >> > > > >> disadvantage of >> > > > >> > > > > >> > one-broker-per-disk summarized in the KIP and >> previous >> > > > email >> > > > >> with >> > > > >> > > Colin, >> > > > >> > > > > >> > the biggest one is the 15% throughput loss compared >> to >> > > JBOD >> > > > >> and >> > > > >> > > less >> > > > >> > > > > >> > flexibility to balance across disks. Further, it >> > probably >> > > > >> requires >> > > > >> > > > > >> change >> > > > >> > > > > >> > to internal deployment tools at various companies to >> > deal >> > > > >> with >> > > > >> > > > > >> > one-broker-per-disk setup. >> > > > >> > > > > >> > >> > > > >> > > > > >> > - JBOD vs. RAID-0: This is the setup that used at >> > > > Microsoft. >> > > > >> The >> > > > >> > > > > >> problem is >> > > > >> > > > > >> > that a broker becomes unavailable if any disk fail. >> > > Suppose >> > > > >> > > > > >> > replication-factor=2 and there are 10 disks per >> > machine. >> > > > >> Then the >> > > > >> > > > > >> > probability of of any message becomes unavailable >> due >> > to >> > > > disk >> > > > >> > > failure >> > > > >> > > > > >> with >> > > > >> > > > > >> > RAID-0 is 100X higher than that with JBOD. >> > > > >> > > > > >> > >> > > > >> > > > > >> > - JBOD vs. one-broker-per-few-disks: >> > > > one-broker-per-few-disk >> > > > >> is >> > > > >> > > > > >> somewhere >> > > > >> > > > > >> > between one-broker-per-disk and RAID-0. So it >> carries >> > an >> > > > >> averaged >> > > > >> > > > > >> > disadvantages of these two approaches. >> > > > >> > > > > >> > >> > > > >> > > > > >> > To answer your question regarding, I think it is >> > > reasonable >> > > > >> to >> > > > >> > > mange >> > > > >> > > > > >> disk >> > > > >> > > > > >> > in Kafka. By "managing disks" we mean the >> management of >> > > > >> > > assignment of >> > > > >> > > > > >> > replicas across disks. Here are my reasons in more >> > > detail: >> > > > >> > > > > >> > >> > > > >> > > > > >> > - I don't think this KIP is a big step change. By >> > > allowing >> > > > >> user to >> > > > >> > > > > >> > configure Kafka to run multiple log directories or >> > disks >> > > as >> > > > >> of >> > > > >> > > now, it >> > > > >> > > > > >> is >> > > > >> > > > > >> > implicit that Kafka manages disks. It is just not a >> > > > complete >> > > > >> > > feature. >> > > > >> > > > > >> > Microsoft and probably other companies are using >> this >> > > > feature >> > > > >> > > under the >> > > > >> > > > > >> > undesirable effect that a broker will fail any if >> any >> > > disk >> > > > >> fail. >> > > > >> > > It is >> > > > >> > > > > >> good >> > > > >> > > > > >> > to complete this feature. >> > > > >> > > > > >> > >> > > > >> > > > > >> > - I think it is reasonable to manage disk in Kafka. >> One >> > > of >> > > > >> the >> > > > >> > > most >> > > > >> > > > > >> > important work that Kafka is doing is to determine >> the >> > > > >> replica >> > > > >> > > > > >> assignment >> > > > >> > > > > >> > across brokers and make sure enough copies of a >> given >> > > > >> replica is >> > > > >> > > > > >> available. >> > > > >> > > > > >> > I would argue that it is not much different than >> > > > determining >> > > > >> the >> > > > >> > > replica >> > > > >> > > > > >> > assignment across disk conceptually. >> > > > >> > > > > >> > >> > > > >> > > > > >> > - I would agree that this KIP is improve >> performance of >> > > > >> Kafka at >> > > > >> > > the >> > > > >> > > > > >> cost >> > > > >> > > > > >> > of more complexity inside Kafka, by switching from >> > > RAID-10 >> > > > to >> > > > >> > > JBOD. I >> > > > >> > > > > >> would >> > > > >> > > > > >> > argue that this is a right direction. If we can gain >> > 20%+ >> > > > >> > > performance by >> > > > >> > > > > >> > managing NIC in Kafka as compared to existing >> approach >> > > and >> > > > >> other >> > > > >> > > > > >> > alternatives, I would say we should just do it. >> Such a >> > > gain >> > > > >> in >> > > > >> > > > > >> performance, >> > > > >> > > > > >> > or equivalently reduction in cost, can save >> millions of >> > > > >> dollars >> > > > >> > > per year >> > > > >> > > > > >> > for any company running Kafka at large scale. >> > > > >> > > > > >> > >> > > > >> > > > > >> > Thanks, >> > > > >> > > > > >> > Dong >> > > > >> > > > > >> > >> > > > >> > > > > >> > >> > > > >> > > > > >> > On Wed, Feb 1, 2017 at 5:41 AM, Eno Thereska < >> > > > >> > > eno.there...@gmail.com> >> > > > >> > > > > >> wrote: >> > > > >> > > > > >> > >> > > > >> > > > > >> >> I'm coming somewhat late to the discussion, >> apologies >> > > for >> > > > >> that. >> > > > >> > > > > >> >> >> > > > >> > > > > >> >> I'm worried about this proposal. It's moving Kafka >> to >> > a >> > > > >> world >> > > > >> > > where it >> > > > >> > > > > >> >> manages disks. So in a sense, the scope of the KIP >> is >> > > > >> limited, >> > > > >> > > but the >> > > > >> > > > > >> >> direction it sets for Kafka is quite a big step >> > change. >> > > > >> > > Fundamentally >> > > > >> > > > > >> this >> > > > >> > > > > >> >> is about balancing resources for a Kafka broker. >> This >> > > can >> > > > be >> > > > >> > > done by a >> > > > >> > > > > >> >> tool, rather than by changing Kafka. E.g., the tool >> > > would >> > > > >> take a >> > > > >> > > bunch >> > > > >> > > > > >> of >> > > > >> > > > > >> >> disks together, create a volume over them and >> export >> > > that >> > > > >> to a >> > > > >> > > Kafka >> > > > >> > > > > >> broker >> > > > >> > > > > >> >> (in addition to setting the memory limits for that >> > > broker >> > > > or >> > > > >> > > limiting >> > > > >> > > > > >> other >> > > > >> > > > > >> >> resources). A different bunch of disks can then >> make >> > up >> > > a >> > > > >> second >> > > > >> > > > > >> volume, >> > > > >> > > > > >> >> and be used by another Kafka broker. This is >> aligned >> > > with >> > > > >> what >> > > > >> > > Colin is >> > > > >> > > > > >> >> saying (as I understand it). >> > > > >> > > > > >> >> >> > > > >> > > > > >> >> Disks are not the only resource on a machine, there >> > are >> > > > >> several >> > > > >> > > > > >> instances >> > > > >> > > > > >> >> where multiple NICs are used for example. Do we >> want >> > > fine >> > > > >> grained >> > > > >> > > > > >> >> management of all these resources? I'd argue that >> > opens >> > > us >> > > > >> the >> > > > >> > > system >> > > > >> > > > > >> to a >> > > > >> > > > > >> >> lot of complexity. >> > > > >> > > > > >> >> >> > > > >> > > > > >> >> Thanks >> > > > >> > > > > >> >> Eno >> > > > >> > > > > >> >> >> > > > >> > > > > >> >> >> > > > >> > > > > >> >>> On 1 Feb 2017, at 01:53, Dong Lin < >> > lindon...@gmail.com >> > > > >> > > > >> wrote: >> > > > >> > > > > >> >>> >> > > > >> > > > > >> >>> Hi all, >> > > > >> > > > > >> >>> >> > > > >> > > > > >> >>> I am going to initiate the vote If there is no >> > further >> > > > >> concern >> > > > >> > > with >> > > > >> > > > > >> the >> > > > >> > > > > >> >> KIP. >> > > > >> > > > > >> >>> >> > > > >> > > > > >> >>> Thanks, >> > > > >> > > > > >> >>> Dong >> > > > >> > > > > >> >>> >> > > > >> > > > > >> >>> >> > > > >> > > > > >> >>> On Fri, Jan 27, 2017 at 8:08 PM, radai < >> > > > >> > > radai.rosenbl...@gmail.com> >> > > > >> > > > > >> >> wrote: >> > > > >> > > > > >> >>> >> > > > >> > > > > >> >>>> a few extra points: >> > > > >> > > > > >> >>>> >> > > > >> > > > > >> >>>> 1. broker per disk might also incur more client >> <--> >> > > > >> broker >> > > > >> > > sockets: >> > > > >> > > > > >> >>>> suppose every producer / consumer "talks" to >1 >> > > > partition, >> > > > >> > > there's a >> > > > >> > > > > >> >> very >> > > > >> > > > > >> >>>> good chance that partitions that were co-located >> on >> > a >> > > > >> single >> > > > >> > > 10-disk >> > > > >> > > > > >> >> broker >> > > > >> > > > > >> >>>> would now be split between several single-disk >> > broker >> > > > >> > > processes on >> > > > >> > > > > >> the >> > > > >> > > > > >> >> same >> > > > >> > > > > >> >>>> machine. hard to put a multiplier on this, but >> > likely >> > > > >x1. >> > > > >> > > sockets >> > > > >> > > > > >> are a >> > > > >> > > > > >> >>>> limited resource at the OS level and incur some >> > memory >> > > > >> cost >> > > > >> > > (kernel >> > > > >> > > > > >> >>>> buffers) >> > > > >> > > > > >> >>>> >> > > > >> > > > > >> >>>> 2. there's a memory overhead to spinning up a JVM >> > > > >> (compiled >> > > > >> > > code and >> > > > >> > > > > >> >> byte >> > > > >> > > > > >> >>>> code objects etc). if we assume this overhead is >> > ~300 >> > > MB >> > > > >> > > (order of >> > > > >> > > > > >> >>>> magnitude, specifics vary) than spinning up 10 >> JVMs >> > > > would >> > > > >> lose >> > > > >> > > you 3 >> > > > >> > > > > >> GB >> > > > >> > > > > >> >> of >> > > > >> > > > > >> >>>> RAM. not a ton, but non negligible. >> > > > >> > > > > >> >>>> >> > > > >> > > > > >> >>>> 3. there would also be some overhead downstream >> of >> > > kafka >> > > > >> in any >> > > > >> > > > > >> >> management >> > > > >> > > > > >> >>>> / monitoring / log aggregation system. likely >> less >> > > than >> > > > >> x10 >> > > > >> > > though. >> > > > >> > > > > >> >>>> >> > > > >> > > > > >> >>>> 4. (related to above) - added complexity of >> > > > administration >> > > > >> > > with more >> > > > >> > > > > >> >>>> running instances. >> > > > >> > > > > >> >>>> >> > > > >> > > > > >> >>>> is anyone running kafka with anywhere near 100GB >> > > heaps? >> > > > i >> > > > >> > > thought the >> > > > >> > > > > >> >> point >> > > > >> > > > > >> >>>> was to rely on kernel page cache to do the disk >> > > > buffering >> > > > >> .... >> > > > >> > > > > >> >>>> >> > > > >> > > > > >> >>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin < >> > > > >> > > lindon...@gmail.com> >> > > > >> > > > > >> wrote: >> > > > >> > > > > >> >>>> >> > > > >> > > > > >> >>>>> Hey Colin, >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>> Thanks much for the comment. Please see me >> comment >> > > > >> inline. >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe < >> > > > >> > > cmcc...@apache.org> >> > > > >> > > > > >> >>>> wrote: >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>>> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote: >> > > > >> > > > > >> >>>>>>> Hey Colin, >> > > > >> > > > > >> >>>>>>> >> > > > >> > > > > >> >>>>>>> Good point! Yeah we have actually considered >> and >> > > > >> tested this >> > > > >> > > > > >> >>>> solution, >> > > > >> > > > > >> >>>>>>> which we call one-broker-per-disk. It would >> work >> > > and >> > > > >> should >> > > > >> > > > > >> require >> > > > >> > > > > >> >>>> no >> > > > >> > > > > >> >>>>>>> major change in Kafka as compared to this JBOD >> > KIP. >> > > > So >> > > > >> it >> > > > >> > > would >> > > > >> > > > > >> be a >> > > > >> > > > > >> >>>>> good >> > > > >> > > > > >> >>>>>>> short term solution. >> > > > >> > > > > >> >>>>>>> >> > > > >> > > > > >> >>>>>>> But it has a few drawbacks which makes it less >> > > > >> desirable in >> > > > >> > > the >> > > > >> > > > > >> long >> > > > >> > > > > >> >>>>>>> term. >> > > > >> > > > > >> >>>>>>> Assume we have 10 disks on a machine. Here are >> > the >> > > > >> problems: >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>>> Hi Dong, >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>>> Thanks for the thoughtful reply. >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>>>> >> > > > >> > > > > >> >>>>>>> 1) Our stress test result shows that >> > > > >> one-broker-per-disk >> > > > >> > > has 15% >> > > > >> > > > > >> >>>> lower >> > > > >> > > > > >> >>>>>>> throughput >> > > > >> > > > > >> >>>>>>> >> > > > >> > > > > >> >>>>>>> 2) Controller would need to send 10X as many >> > > > >> > > LeaderAndIsrRequest, >> > > > >> > > > > >> >>>>>>> MetadataUpdateRequest and StopReplicaRequest. >> > This >> > > > >> > > increases the >> > > > >> > > > > >> >>>> burden >> > > > >> > > > > >> >>>>>>> on >> > > > >> > > > > >> >>>>>>> controller which can be the performance >> > bottleneck. >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>>> Maybe I'm misunderstanding something, but there >> > > would >> > > > >> not be >> > > > >> > > 10x as >> > > > >> > > > > >> >>>> many >> > > > >> > > > > >> >>>>>> StopReplicaRequest RPCs, would there? The >> other >> > > > >> requests >> > > > >> > > would >> > > > >> > > > > >> >>>> increase >> > > > >> > > > > >> >>>>>> 10x, but from a pretty low base, right? We are >> > not >> > > > >> > > reassigning >> > > > >> > > > > >> >>>>>> partitions all the time, I hope (or else we >> have >> > > > bigger >> > > > >> > > > > >> problems...) >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>> I think the controller will group >> > StopReplicaRequest >> > > > per >> > > > >> > > broker and >> > > > >> > > > > >> >> send >> > > > >> > > > > >> >>>>> only one StopReplicaRequest to a broker during >> > > > controlled >> > > > >> > > shutdown. >> > > > >> > > > > >> >>>> Anyway, >> > > > >> > > > > >> >>>>> we don't have to worry about this if we agree >> that >> > > > other >> > > > >> > > requests >> > > > >> > > > > >> will >> > > > >> > > > > >> >>>>> increase by 10X. One MetadataRequest to send to >> > each >> > > > >> broker >> > > > >> > > in the >> > > > >> > > > > >> >>>> cluster >> > > > >> > > > > >> >>>>> every time there is leadership change. I am not >> > sure >> > > > >> this is >> > > > >> > > a real >> > > > >> > > > > >> >>>>> problem. But in theory this makes the overhead >> > > > complexity >> > > > >> > > O(number >> > > > >> > > > > >> of >> > > > >> > > > > >> >>>>> broker) and may be a concern in the future. >> Ideally >> > > we >> > > > >> should >> > > > >> > > avoid >> > > > >> > > > > >> it. >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>>>> >> > > > >> > > > > >> >>>>>>> 3) Less efficient use of physical resource on >> the >> > > > >> machine. >> > > > >> > > The >> > > > >> > > > > >> number >> > > > >> > > > > >> >>>>> of >> > > > >> > > > > >> >>>>>>> socket on each machine will increase by 10X. >> The >> > > > >> number of >> > > > >> > > > > >> connection >> > > > >> > > > > >> >>>>>>> between any two machine will increase by 100X. >> > > > >> > > > > >> >>>>>>> >> > > > >> > > > > >> >>>>>>> 4) Less efficient way to management memory and >> > > quota. >> > > > >> > > > > >> >>>>>>> >> > > > >> > > > > >> >>>>>>> 5) Rebalance between disks/brokers on the same >> > > > machine >> > > > >> will >> > > > >> > > less >> > > > >> > > > > >> >>>>>>> efficient >> > > > >> > > > > >> >>>>>>> and less flexible. Broker has to read data >> from >> > > > another >> > > > >> > > broker on >> > > > >> > > > > >> the >> > > > >> > > > > >> >>>>>>> same >> > > > >> > > > > >> >>>>>>> machine via socket. It is also harder to do >> > > automatic >> > > > >> load >> > > > >> > > balance >> > > > >> > > > > >> >>>>>>> between >> > > > >> > > > > >> >>>>>>> disks on the same machine in the future. >> > > > >> > > > > >> >>>>>>> >> > > > >> > > > > >> >>>>>>> I will put this and the explanation in the >> > rejected >> > > > >> > > alternative >> > > > >> > > > > >> >>>>> section. >> > > > >> > > > > >> >>>>>>> I >> > > > >> > > > > >> >>>>>>> have a few questions: >> > > > >> > > > > >> >>>>>>> >> > > > >> > > > > >> >>>>>>> - Can you explain why this solution can help >> > avoid >> > > > >> > > scalability >> > > > >> > > > > >> >>>>>>> bottleneck? >> > > > >> > > > > >> >>>>>>> I actually think it will exacerbate the >> > scalability >> > > > >> problem >> > > > >> > > due >> > > > >> > > > > >> the >> > > > >> > > > > >> >>>> 2) >> > > > >> > > > > >> >>>>>>> above. >> > > > >> > > > > >> >>>>>>> - Why can we push more RPC with this solution? >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>>> To really answer this question we'd have to >> take a >> > > > deep >> > > > >> dive >> > > > >> > > into >> > > > >> > > > > >> the >> > > > >> > > > > >> >>>>>> locking of the broker and figure out how >> > effectively >> > > > it >> > > > >> can >> > > > >> > > > > >> >> parallelize >> > > > >> > > > > >> >>>>>> truly independent requests. Almost every >> > > > multithreaded >> > > > >> > > process is >> > > > >> > > > > >> >>>> going >> > > > >> > > > > >> >>>>>> to have shared state, like shared queues or >> shared >> > > > >> sockets, >> > > > >> > > that is >> > > > >> > > > > >> >>>>>> going to make scaling less than linear when you >> > add >> > > > >> disks or >> > > > >> > > > > >> >>>> processors. >> > > > >> > > > > >> >>>>>> (And clearly, another option is to improve that >> > > > >> scalability, >> > > > >> > > rather >> > > > >> > > > > >> >>>>>> than going multi-process!) >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>> Yeah I also think it is better to improve >> > scalability >> > > > >> inside >> > > > >> > > kafka >> > > > >> > > > > >> code >> > > > >> > > > > >> >>>> if >> > > > >> > > > > >> >>>>> possible. I am not sure we currently have any >> > > > scalability >> > > > >> > > issue >> > > > >> > > > > >> inside >> > > > >> > > > > >> >>>>> Kafka that can not be removed without using >> > > > >> multi-process. >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>>>> - It is true that a garbage collection in one >> > > broker >> > > > >> would >> > > > >> > > not >> > > > >> > > > > >> affect >> > > > >> > > > > >> >>>>>>> others. But that is after every broker only >> uses >> > > 1/10 >> > > > >> of the >> > > > >> > > > > >> memory. >> > > > >> > > > > >> >>>>> Can >> > > > >> > > > > >> >>>>>>> we be sure that this will actually help >> > > performance? >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>>> The big question is, how much memory do Kafka >> > > brokers >> > > > >> use >> > > > >> > > now, and >> > > > >> > > > > >> how >> > > > >> > > > > >> >>>>>> much will they use in the future? Our >> experience >> > in >> > > > >> HDFS >> > > > >> > > was that >> > > > >> > > > > >> >> once >> > > > >> > > > > >> >>>>>> you start getting more than 100-200GB Java heap >> > > sizes, >> > > > >> full >> > > > >> > > GCs >> > > > >> > > > > >> start >> > > > >> > > > > >> >>>>>> taking minutes to finish when using the >> standard >> > > JVMs. >> > > > >> That >> > > > >> > > alone >> > > > >> > > > > >> is >> > > > >> > > > > >> >> a >> > > > >> > > > > >> >>>>>> good reason to go multi-process or consider >> > storing >> > > > more >> > > > >> > > things off >> > > > >> > > > > >> >> the >> > > > >> > > > > >> >>>>>> Java heap. >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>> I see. Now I agree one-broker-per-disk should be >> > more >> > > > >> > > efficient in >> > > > >> > > > > >> >> terms >> > > > >> > > > > >> >>>> of >> > > > >> > > > > >> >>>>> GC since each broker probably needs less than >> 1/10 >> > of >> > > > the >> > > > >> > > memory >> > > > >> > > > > >> >>>> available >> > > > >> > > > > >> >>>>> on a typical machine nowadays. I will remove >> this >> > > from >> > > > >> the >> > > > >> > > reason of >> > > > >> > > > > >> >>>>> rejection. >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>>> Disk failure is the "easy" case. The "hard" >> case, >> > > > >> which is >> > > > >> > > > > >> >>>>>> unfortunately also the much more common case, >> is >> > > disk >> > > > >> > > misbehavior. >> > > > >> > > > > >> >>>>>> Towards the end of their lives, disks tend to >> > start >> > > > >> slowing >> > > > >> > > down >> > > > >> > > > > >> >>>>>> unpredictably. Requests that would have >> completed >> > > > >> > > immediately >> > > > >> > > > > >> before >> > > > >> > > > > >> >>>>>> start taking 20, 100 500 milliseconds. Some >> files >> > > may >> > > > >> be >> > > > >> > > readable >> > > > >> > > > > >> and >> > > > >> > > > > >> >>>>>> other files may not be. System calls hang, >> > > sometimes >> > > > >> > > forever, and >> > > > >> > > > > >> the >> > > > >> > > > > >> >>>>>> Java process can't abort them, because the >> hang is >> > > in >> > > > >> the >> > > > >> > > kernel. >> > > > >> > > > > >> It >> > > > >> > > > > >> >>>> is >> > > > >> > > > > >> >>>>>> not fun when threads are stuck in "D state" >> > > > >> > > > > >> >>>>>> http://stackoverflow.com/quest >> > > > >> ions/20423521/process-perminan >> > > > >> > > > > >> >>>>>> tly-stuck-on-d-state >> > > > >> > > > > >> >>>>>> . Even kill -9 cannot abort the thread then. >> > > > >> Fortunately, >> > > > >> > > this is >> > > > >> > > > > >> >>>>>> rare. >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>> I agree it is a harder problem and it is rare. >> We >> > > > >> probably >> > > > >> > > don't >> > > > >> > > > > >> have >> > > > >> > > > > >> >> to >> > > > >> > > > > >> >>>>> worry about it in this KIP since this issue is >> > > > >> orthogonal to >> > > > >> > > > > >> whether or >> > > > >> > > > > >> >>>> not >> > > > >> > > > > >> >>>>> we use JBOD. >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>>> Another approach we should consider is for >> Kafka >> > to >> > > > >> > > implement its >> > > > >> > > > > >> own >> > > > >> > > > > >> >>>>>> storage layer that would stripe across multiple >> > > disks. >> > > > >> This >> > > > >> > > > > >> wouldn't >> > > > >> > > > > >> >>>>>> have to be done at the block level, but could >> be >> > > done >> > > > >> at the >> > > > >> > > file >> > > > >> > > > > >> >>>> level. >> > > > >> > > > > >> >>>>>> We could use consistent hashing to determine >> which >> > > > >> disks a >> > > > >> > > file >> > > > >> > > > > >> should >> > > > >> > > > > >> >>>>>> end up on, for example. >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>> Are you suggesting that we should distribute >> log, >> > or >> > > > log >> > > > >> > > segment, >> > > > >> > > > > >> >> across >> > > > >> > > > > >> >>>>> disks of brokers? I am not sure if I fully >> > understand >> > > > >> this >> > > > >> > > > > >> approach. My >> > > > >> > > > > >> >>>> gut >> > > > >> > > > > >> >>>>> feel is that this would be a drastic solution >> that >> > > > would >> > > > >> > > require >> > > > >> > > > > >> >>>>> non-trivial design. While this may be useful to >> > > Kafka, >> > > > I >> > > > >> would >> > > > >> > > > > >> prefer >> > > > >> > > > > >> >> not >> > > > >> > > > > >> >>>>> to discuss this in detail in this thread unless >> you >> > > > >> believe >> > > > >> > > it is >> > > > >> > > > > >> >>>> strictly >> > > > >> > > > > >> >>>>> superior to the design in this KIP in terms of >> > > solving >> > > > >> our >> > > > >> > > use-case. >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>>>> best, >> > > > >> > > > > >> >>>>>> Colin >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>>>> >> > > > >> > > > > >> >>>>>>> Thanks, >> > > > >> > > > > >> >>>>>>> Dong >> > > > >> > > > > >> >>>>>>> >> > > > >> > > > > >> >>>>>>> On Wed, Jan 25, 2017 at 11:34 AM, Colin >> McCabe < >> > > > >> > > > > >> cmcc...@apache.org> >> > > > >> > > > > >> >>>>>>> wrote: >> > > > >> > > > > >> >>>>>>> >> > > > >> > > > > >> >>>>>>>> Hi Dong, >> > > > >> > > > > >> >>>>>>>> >> > > > >> > > > > >> >>>>>>>> Thanks for the writeup! It's very >> interesting. >> > > > >> > > > > >> >>>>>>>> >> > > > >> > > > > >> >>>>>>>> I apologize in advance if this has been >> > discussed >> > > > >> > > somewhere else. >> > > > >> > > > > >> >>>>> But >> > > > >> > > > > >> >>>>>> I >> > > > >> > > > > >> >>>>>>>> am curious if you have considered the >> solution >> > of >> > > > >> running >> > > > >> > > > > >> multiple >> > > > >> > > > > >> >>>>>>>> brokers per node. Clearly there is a memory >> > > > overhead >> > > > >> with >> > > > >> > > this >> > > > >> > > > > >> >>>>>> solution >> > > > >> > > > > >> >>>>>>>> because of the fixed cost of starting >> multiple >> > > JVMs. >> > > > >> > > However, >> > > > >> > > > > >> >>>>> running >> > > > >> > > > > >> >>>>>>>> multiple JVMs would help avoid scalability >> > > > >> bottlenecks. >> > > > >> > > You >> > > > >> > > > > >> could >> > > > >> > > > > >> >>>>>>>> probably push more RPCs per second, for >> example. >> > > A >> > > > >> garbage >> > > > >> > > > > >> >>>>> collection >> > > > >> > > > > >> >>>>>>>> in one broker would not affect the others. >> It >> > > would >> > > > >> be >> > > > >> > > > > >> interesting >> > > > >> > > > > >> >>>>> to >> > > > >> > > > > >> >>>>>>>> see this considered in the "alternate >> designs" >> > > > design, >> > > > >> > > even if >> > > > >> > > > > >> you >> > > > >> > > > > >> >>>>> end >> > > > >> > > > > >> >>>>>>>> up deciding it's not the way to go. >> > > > >> > > > > >> >>>>>>>> >> > > > >> > > > > >> >>>>>>>> best, >> > > > >> > > > > >> >>>>>>>> Colin >> > > > >> > > > > >> >>>>>>>> >> > > > >> > > > > >> >>>>>>>> >> > > > >> > > > > >> >>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong Lin >> wrote: >> > > > >> > > > > >> >>>>>>>>> Hi all, >> > > > >> > > > > >> >>>>>>>>> >> > > > >> > > > > >> >>>>>>>>> We created KIP-112: Handle disk failure for >> > JBOD. >> > > > >> Please >> > > > >> > > find >> > > > >> > > > > >> the >> > > > >> > > > > >> >>>>> KIP >> > > > >> > > > > >> >>>>>>>>> wiki >> > > > >> > > > > >> >>>>>>>>> in the link https://cwiki.apache.org/confl >> > > > >> > > > > >> >>>> uence/display/KAFKA/KIP- >> > > > >> > > > > >> >>>>>>>>> 112%3A+Handle+disk+failure+for+JBOD. >> > > > >> > > > > >> >>>>>>>>> >> > > > >> > > > > >> >>>>>>>>> This KIP is related to KIP-113 >> > > > >> > > > > >> >>>>>>>>> <https://cwiki.apache.org/conf >> > > > >> luence/display/KAFKA/KIP- >> > > > >> > > > > >> >>>>>>>> 113%3A+Support+replicas+moveme >> > > > >> nt+between+log+directories>: >> > > > >> > > > > >> >>>>>>>>> Support replicas movement between log >> > > directories. >> > > > >> They >> > > > >> > > are >> > > > >> > > > > >> >>>> needed >> > > > >> > > > > >> >>>>> in >> > > > >> > > > > >> >>>>>>>>> order >> > > > >> > > > > >> >>>>>>>>> to support JBOD in Kafka. Please help review >> > the >> > > > >> KIP. You >> > > > >> > > > > >> >>>> feedback >> > > > >> > > > > >> >>>>> is >> > > > >> > > > > >> >>>>>>>>> appreciated! >> > > > >> > > > > >> >>>>>>>>> >> > > > >> > > > > >> >>>>>>>>> Thanks, >> > > > >> > > > > >> >>>>>>>>> Dong >> > > > >> > > > > >> >>>>>>>> >> > > > >> > > > > >> >>>>>> >> > > > >> > > > > >> >>>>> >> > > > >> > > > > >> >>>> >> > > > >> > > > > >> >> >> > > > >> > > > > >> >> >> > > > >> > > > > >> >> > > > >> > > > > >> >> > > > >> > > > > > >> > > > >> > > >> > > > >> >> > > > > >> > > > > >> > > > >> > > >> > >> > >