Hey Jun, After thinking about 14 more, I think your solution is reasonable. I have updated the KIP to specify that the number of ReplicaMoveThread defaults to # log dirs.
Thanks! Dong On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > Thanks for your comment! Please see my reply below. > > On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao <j...@confluent.io> wrote: > >> Hi, Dong, >> >> Thanks for the reply. >> >> 10. Could you comment on that? >> > > Sorry, I missed that comment. > > Good point. I think the log segments in topicPartition.move directory will > be subject to log truncation, log retention and log cleaning in the same > way as the log segments in the source log directory. I just specified this > inthe KIP. > > >> >> 11.2 "I am concerned that the ChangeReplicaDirRequest would be lost if >> broker >> restarts after it sends ChangeReplicaDirResponse but before it receives >> LeaderAndIsrRequest." >> >> In that case, the reassignment tool could detect that through >> DescribeDirsRequest >> and issue ChangeReplicaDirRequest again, right? In the common case, this >> is >> probably not needed and we only need to write each replica once. >> >> My main concern with the approach in the current KIP is that once a new >> replica is created in the wrong log dir, the cross log directory movement >> may not catch up until the new replica is fully bootstrapped. So, we end >> up >> writing the data for the same replica twice. >> > > I agree with your concern. My main concern is that it is a bit weird if > ChangeReplicaDirResponse can not guarantee success and the tool needs to > rely on DescribeDirResponse to see if it needs to send > ChangeReplicaDirRequest again. > > How about this: If broker doesn't not have already replica created for the > specified topicParition when it receives ChangeReplicaDirRequest, it will > reply ReplicaNotAvailableException AND remember (replica, destination log > directory) pair in memory to create the replica in the specified log > directory. > > >> >> 11.3 Are you saying the value in --throttle will be used to set both >> intra.broker.throttled.rate and leader.follower.replication. >> throttled.replicas? >> > > No. --throttle will be used to only to set leader.follower.replication as > it does now. I think we do not need any option in the > kafka-reassignment-partitions.sh to specify intra.broker.throttled.rate. > User canset it in broker config or dynamically using kafka-config.sh. Does > this sound OK? > > >> >> 12.2 If the user only wants to check one topic, the tool could do the >> filtering on the client side, right? My concern with having both log_dirs >> and topics is the semantic. For example, if both are not empty, do we >> return the intersection or the union? >> > > Yes the tool could filter on the client side. But the purpose of having > this field is to reduce response side in case broker has a lot of topics. > The both fields are used as filter and the result is intersection. Do you > think this semantic is confusing or counter-intuitive? > > >> >> 12.3. Yes, firstOffset may not be useful. >> >> 14. Hmm, I would think moving data across log dirs will be io bound. We >> also have num.recovery.threads.per.data.dir, which defaults to 1. So, >> having num.replica.move.threads defaults to # log dirs or half of that (to >> account for ios on both source and target) seems reasonable. Is a magical >> value of 3 going to be better? Does that work with only 2 log dirs? There >> will always be cases when the user needs to customize the value. We just >> need a reasonable default to cover the common case. >> > > If the throughput of moving data across dir doesn't not increase with > number of threads, I think we should provide config > num.replica.move.thread.per.log.dir and give it default value of 1. That > works in the same way as num.recovery.threads.per.data.dir. But I think > the replica movement is not necessarily IO bound if broker is using SSD. > Thus it seems more reasonable to have config num.replica.move.threads that > is shared across all log directories. > > Currently all Kafka configs, including num.recovery.threads.per.data.dir, > defaults to a constant value instead of relying on values of configs. This > it will be a bit weird if the config name itself is not per log dir but its > default value is per dir. And it will also make both code and user > documentation a bit more complicated because currently all configs, > including num.recovery.threads.per.data.dir, defaults to a constant > value. The advantage of using a magic value is simplicity. To answer your > question, I think 3 ReplicaMoveThreads can work with more than 2 log > directories. Say there are 3 ReplicaMoveThreads and 4 log directories, each > ReplicaMoveThread will check if there is any replica waiting for movement, > finish movement of this replica, and check again. Is there any concern with > this approach? > > I have chosen the magic value 3 because current default number of network > threads is 3. We can also set it to 8 which is the default number of io > threads. Would there be any performance concern with using 8 threads by > default? > > > >> >> 20. Should we support canceling the movement across log dirs? I was >> thinking this can be achieved with a ChangeReplicaDirRequest with dir = >> any. >> > > As of current KIP user can cancel movement across log directories by first > sending DescribeDirsRequest, figure out the source directory of those > replicas that are being moved, and then send ChangeReplicaDirRequest to > move replica to the source log directory. But "any" seems like an easier > and reasonable approach to cancel replica movement. I just added it to the > KIP. > > >> >> Jun >> >> >> On Mon, Mar 13, 2017 at 3:32 PM, Dong Lin <lindon...@gmail.com> wrote: >> >> > Hey Jun, >> > >> > Thanks much for your detailed comments. Please see my reply below. >> > >> > On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao <j...@confluent.io> wrote: >> > >> > > Hi, Dong, >> > > >> > > Thanks for the updated KIP. Some more comments below. >> > > >> > > 10. For the .move log, do we perform any segment deletion (based on >> > > retention) or log cleaning (if a compacted topic)? Or do we only >> enable >> > > that after the swap? >> > > >> > > 11. kafka-reassign-partitions.sh >> > > 11.1 If all reassigned replicas are in the current broker and only the >> > log >> > > directories have changed, we can probably optimize the tool to not >> > trigger >> > > partition reassignment through the controller and only >> > > send ChangeReplicaDirRequest. >> > > >> > >> > Yes, the reassignment script should not create the reassignment znode >> if no >> > replicas are not be moved between brokers. This falls into the "How to >> move >> > replica between log directories on the same broker" of the Proposed >> Change >> > section. >> > >> > >> > > 11.2 If ChangeReplicaDirRequest specifies a replica that's not created >> > yet, >> > > could the broker just remember that in memory and create the replica >> when >> > > the creation is requested? This way, when doing cluster expansion, we >> can >> > > make sure that the new replicas on the new brokers are created in the >> > right >> > > log directory in the first place. We can also avoid the tool having to >> > keep >> > > issuing ChangeReplicaDirRequest in response to >> > > ReplicaNotAvailableException. >> > > >> > >> > I am concerned that the ChangeReplicaDirRequest would be lost if broker >> > restarts after it sends ChangeReplicaDirResponse but before it receives >> > LeaderAndIsrRequest. In this case, the user will receive success when >> they >> > initiate replica reassignment, but replica reassignment will never >> complete >> > when they verify the reassignment later. This would be confusing to >> user. >> > >> > There are three different approaches to this problem if broker has not >> > created replica yet after it receives ChangeReplicaDirResquest: >> > >> > 1) Broker immediately replies to user with ReplicaNotAvailableException >> and >> > user can decide to retry again later. The advantage of this solution is >> > that the broker logic is very simple and the reassignment script logic >> also >> > seems straightforward. The disadvantage is that user script has to >> retry. >> > But it seems fine - we can set interval between retries to be 0.5 sec so >> > that broker want be bombarded by those requests. This is the solution >> > chosen in the current KIP. >> > >> > 2) Broker can put ChangeReplicaDirRequest in a purgatory with timeout >> and >> > replies to user after the replica has been created. I didn't choose >> this in >> > the interest of keeping broker logic simpler. >> > >> > 3) Broker can remember that by making a mark in the disk, e.g. create >> > topicPartition.tomove directory in the destination log directory. This >> mark >> > will be persisted across broker restart. This is the first idea I had >> but I >> > replaced it with solution 1) in the interest of keeping broker simple. >> > >> > It seems that solution 1) is the simplest one that works. But I am OK to >> > switch to the other two solutions if we don't want the retry logic. >> What do >> > you think? >> > >> > >> > 11.3 Do we need an option in the tool to specify intra.broker. >> > > throttled.rate? >> > > >> > >> > I don't find it useful to add this option to >> kafka-reassign-partitions.sh. >> > The reason we have the option "--throttle" in the script to throttle >> > replication rate is that we usually want higher quota to fix an offline >> > replica to get out of URP. But we are OK to have a lower quota if we are >> > moving replica only to balance the cluster. Thus it is common for SRE to >> > use different quota when using kafka-reassign-partitions.sh to move >> replica >> > between brokers. >> > >> > However, the only reason for moving replica between log directories of >> the >> > same broker is to balance cluster resource. Thus the option to >> > specify intra.broker.throttled.rate in the tool is not that useful. I am >> > inclined not to add this option to keep this tool's usage simpler. >> > >> > >> > > >> > > 12. DescribeDirsRequest >> > > 12.1 In other requests like CreateTopicRequest, we return an empty >> list >> > in >> > > the response for an empty input list. If the input list is null, we >> > return >> > > everything. We should probably follow the same convention here. >> > > >> > >> > Thanks. I wasn't aware of this convention. I have change >> > DescribeDirsRequest so that "null" indicates "all". >> > >> > >> > > 12.2 Do we need the topics field? Since the request is about log >> dirs, it >> > > makes sense to specify the log dirs. But it's weird to specify topics. >> > > >> > >> > The topics field is not necessary. But it is useful to reduce the >> response >> > size in case user are only interested in the status of a few topics. For >> > example, user may have initiated the reassignment of a given replica >> from >> > one log directory to another log directory on the same broker, and the >> user >> > only wants to check the status of this given partition by looking >> > at DescribeDirsResponse. Thus this field is useful. >> > >> > I am not sure if it is weird to call this request DescribeDirsRequest. >> The >> > response is a map from log directory to information to some partitions >> on >> > the log directory. Do you think we need to change the name of the >> request? >> > >> > >> > > 12.3 DescribeDirsResponsePartition: Should we include firstOffset and >> > > nextOffset in the response? That could be useful to track the >> progress of >> > > the movement. >> > > >> > >> > Yeah good point. I agree it is useful to include logEndOffset in the >> > response. According to Log.scala doc the logEndOffset is equivalent to >> the >> > nextOffset. User can track progress by checking the difference between >> > logEndOffset of the given partition in the source and destination log >> > directories. I have added logEndOffset to the >> DescribeDirsResponsePartition >> > in the KIP. >> > >> > But it seems that we don't need firstOffset in the response. Do you >> think >> > firstOffset is still needed? >> > >> > >> > > >> > > 13. ChangeReplicaDirResponse: Do we need error code at both levels? >> > > >> > >> > My bad. It is not needed. I have removed request level error code. I >> also >> > added ChangeReplicaDirRequestTopic and ChangeReplicaDirResponseTopic to >> > reduce duplication of the "topic" string in the request and response. >> > >> > >> > > >> > > 14. num.replica.move.threads: Does it default to # log dirs? >> > > >> > >> > No. It doesn't. I expect default number to be set to a conservative >> value >> > such as 3. It may be surprising to user if the number of threads >> increase >> > just because they have assigned more log directories to Kafka broker. >> > >> > It seems that the number of replica move threads doesn't have to depend >> on >> > the number of log directories. It is possible to have one thread that >> moves >> > replicas across all log directories. On the other hand we can have >> multiple >> > threads to move replicas to the same log directory. For example, if >> broker >> > uses SSD, the CPU instead of disk IO may be the replica move bottleneck >> and >> > it will be faster to move replicas using multiple threads per log >> > directory. >> > >> > >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > On Thu, Mar 9, 2017 at 7:04 PM, Dong Lin <lindon...@gmail.com> wrote: >> > > >> > > > I just made one correction in the KIP. If broker receives >> > > > ChangeReplicaDirRequest and the replica hasn't been created there, >> the >> > > > broker will respond ReplicaNotAvailableException. >> > > > The kafka-reassignemnt-partitions.sh will need to re-send >> > > > ChangeReplicaDirRequest in this case in order to wait for >> controller to >> > > > send LeaderAndIsrRequest to broker. The previous approach of >> creating >> > an >> > > > empty directory seems hacky. >> > > > >> > > > >> > > > >> > > > >> > > > On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin <lindon...@gmail.com> >> wrote: >> > > > >> > > > > Hey Jun, >> > > > > >> > > > > Thanks for your comments! I have updated the KIP to address your >> > > > comments. >> > > > > Please see my reply inline. >> > > > > >> > > > > Can you let me know if the latest KIP has addressed your comments? >> > > > > >> > > > > On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao <j...@confluent.io> wrote: >> > > > > >> > > > >> Hi, Dong, >> > > > >> >> > > > >> Thanks for the reply. >> > > > >> >> > > > >> 1.3 So the thread gets the lock, checks if caught up and releases >> > the >> > > > lock >> > > > >> if not? Then, in the case when there is continuous incoming data, >> > the >> > > > >> thread may never get a chance to swap. One way to address this is >> > when >> > > > the >> > > > >> thread is getting really close in catching up, just hold onto the >> > lock >> > > > >> until the thread fully catches up. >> > > > >> >> > > > > >> > > > > Yes, that was my original solution. I see your point that the lock >> > may >> > > > not >> > > > > be fairly assigned to ReplicaMoveThread and RequestHandlerThread >> when >> > > > there >> > > > > is frequent incoming requets. You solution should address the >> problem >> > > > and I >> > > > > have updated the KIP to use it. >> > > > > >> > > > > >> > > > >> >> > > > >> 2.3 So, you are saying that the partition reassignment tool can >> > first >> > > > send >> > > > >> a ChangeReplicaDirRequest to relevant brokers to establish the >> log >> > dir >> > > > for >> > > > >> replicas not created yet, then trigger the partition movement >> across >> > > > >> brokers through the controller? That's actually a good idea. >> Then, >> > we >> > > > can >> > > > >> just leave LeaderAndIsrRequest as it is. >> > > > > >> > > > > >> > > > > Yes, that is what I plan to do. If broker receives a >> > > > > ChangeReplicaDirRequest while it is not leader or follower of the >> > > > > partition, the broker will create an empty Log instance (i.e. a >> > > directory >> > > > > named topicPartition) in the destination log directory so that the >> > > > replica >> > > > > will be placed there when broker receives LeaderAndIsrRequest from >> > the >> > > > > broker. The broker should clean up empty those Log instances on >> > startup >> > > > > just in case a ChangeReplicaDirRequest was mistakenly sent to a >> > broker >> > > > that >> > > > > was not meant to be follower/leader of the partition.. >> > > > > >> > > > > >> > > > >> Another thing related to >> > > > >> ChangeReplicaDirRequest. >> > > > >> Since this request may take long to complete, I am not sure if we >> > > should >> > > > >> wait for the movement to complete before respond. While waiting >> for >> > > the >> > > > >> movement to complete, the idle connection may be killed or the >> > client >> > > > may >> > > > >> be gone already. An alternative is to return immediately and add >> a >> > new >> > > > >> request like CheckReplicaDirRequest to see if the movement has >> > > > completed. >> > > > >> The tool can take advantage of that to check the status. >> > > > >> >> > > > > >> > > > > I agree with your concern and solution. We need request to query >> the >> > > > > partition -> log_directory mapping on the broker. I have updated >> the >> > > KIP >> > > > to >> > > > > remove need for ChangeReplicaDirRequestPurgatory. >> > > > > Instead, kafka-reassignemnt-partitions.sh will send >> > > DescribeDirsRequest >> > > > > to brokers when user wants to verify the partition assignment. >> Since >> > we >> > > > > need this DescribeDirsRequest anyway, we can also use this >> request to >> > > > > expose stats like the individual log size instead of using JMX. >> One >> > > > > drawback of using JMX is that user has to manage the JMX port and >> > > related >> > > > > credentials if they haven't already done this, which is the case >> at >> > > > > LinkedIn. >> > > > > >> > > > > >> > > > >> Thanks, >> > > > >> >> > > > >> Jun >> > > > >> >> > > > >> >> > > > >> >> > > > >> On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin <lindon...@gmail.com> >> > wrote: >> > > > >> >> > > > >> > Hey Jun, >> > > > >> > >> > > > >> > Thanks for the detailed explanation. I will use the separate >> > thread >> > > > >> pool to >> > > > >> > move replica between log directories. I will let you know when >> the >> > > KIP >> > > > >> has >> > > > >> > been updated to use a separate thread pool. >> > > > >> > >> > > > >> > Here is my response to your other questions: >> > > > >> > >> > > > >> > 1.3 My idea is that the ReplicaMoveThread that moves data >> should >> > get >> > > > the >> > > > >> > lock before checking whether the replica in the destination log >> > > > >> directory >> > > > >> > has caught up. If the new replica has caught up, then the >> > > > >> ReplicaMoveThread >> > > > >> > should swaps the replica while it is still holding the lock. >> The >> > > > >> > ReplicaFetcherThread or RequestHandlerThread will not be able >> to >> > > > append >> > > > >> > data to the replica in the source replica during this period >> > because >> > > > >> they >> > > > >> > can not get the lock. Does this address the problem? >> > > > >> > >> > > > >> > 2.3 I get your point that we want to keep controller simpler. >> If >> > > admin >> > > > >> tool >> > > > >> > can send ChangeReplicaDirRequest to move data within a broker, >> > then >> > > > >> > controller probably doesn't even need to include log directory >> > path >> > > in >> > > > >> the >> > > > >> > LeaderAndIsrRequest. How about this: controller will only deal >> > with >> > > > >> > reassignment across brokers as it does now. If user specified >> > > > >> destination >> > > > >> > replica for any disk, the admin tool will send >> > > ChangeReplicaDirRequest >> > > > >> and >> > > > >> > wait for response from broker to confirm that all replicas have >> > been >> > > > >> moved >> > > > >> > to the destination log direcotry. The broker will put >> > > > >> > ChangeReplicaDirRequset in a purgatory and respond either when >> the >> > > > >> movement >> > > > >> > is completed or when the request has timed-out. >> > > > >> > >> > > > >> > 4. I agree that we can expose these metrics via JMX. But I am >> not >> > > sure >> > > > >> if >> > > > >> > it can be obtained easily with good performance using either >> > > existing >> > > > >> tools >> > > > >> > or new script in kafka. I will ask SREs for their opinion. >> > > > >> > >> > > > >> > Thanks, >> > > > >> > Dong >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao <j...@confluent.io> >> wrote: >> > > > >> > >> > > > >> > > Hi, Dong, >> > > > >> > > >> > > > >> > > Thanks for the updated KIP. A few more comments below. >> > > > >> > > >> > > > >> > > 1.1 and 1.2: I am still not sure there is enough benefit of >> > > reusing >> > > > >> > > ReplicaFetchThread >> > > > >> > > to move data across disks. >> > > > >> > > (a) A big part of ReplicaFetchThread is to deal with issuing >> and >> > > > >> tracking >> > > > >> > > fetch requests. So, it doesn't feel that we get much from >> > reusing >> > > > >> > > ReplicaFetchThread >> > > > >> > > only to disable the fetching part. >> > > > >> > > (b) The leader replica has no ReplicaFetchThread to start >> with. >> > It >> > > > >> feels >> > > > >> > > weird to start one just for intra broker data movement. >> > > > >> > > (c) The ReplicaFetchThread is per broker. Intuitively, the >> > number >> > > of >> > > > >> > > threads doing intra broker data movement should be related to >> > the >> > > > >> number >> > > > >> > of >> > > > >> > > disks in the broker, not the number of brokers in the >> cluster. >> > > > >> > > (d) If the destination disk fails, we want to stop the intra >> > > broker >> > > > >> data >> > > > >> > > movement, but want to continue inter broker replication. So, >> > > > >> logically, >> > > > >> > it >> > > > >> > > seems it's better to separate out the two. >> > > > >> > > (e) I am also not sure if we should reuse the existing >> > throttling >> > > > for >> > > > >> > > replication. It's designed to handle traffic across brokers >> and >> > > the >> > > > >> > > delaying is done in the fetch request. So, if we are not >> doing >> > > > >> > > fetching in ReplicaFetchThread, >> > > > >> > > I am not sure the existing throttling is effective. Also, >> when >> > > > >> specifying >> > > > >> > > the throttling of moving data across disks, it seems the user >> > > > >> shouldn't >> > > > >> > > care about whether a replica is a leader or a follower. >> Reusing >> > > the >> > > > >> > > existing throttling config name will be awkward in this >> regard. >> > > > >> > > (f) It seems it's simpler and more consistent to use a >> separate >> > > > thread >> > > > >> > pool >> > > > >> > > for local data movement (for both leader and follower >> replicas). >> > > > This >> > > > >> > > process can then be configured (e.g. number of threads, etc) >> and >> > > > >> > throttled >> > > > >> > > independently. >> > > > >> > > >> > > > >> > > 1.3 Yes, we will need some synchronization there. So, if the >> > > > movement >> > > > >> > > thread catches up, gets the lock to do the swap, but realizes >> > that >> > > > new >> > > > >> > data >> > > > >> > > is added, it has to continue catching up while holding the >> lock? >> > > > >> > > >> > > > >> > > 2.3 The benefit of including the desired log directory in >> > > > >> > > LeaderAndIsrRequest >> > > > >> > > during partition reassignment is that the controller doesn't >> > need >> > > to >> > > > >> > track >> > > > >> > > the progress for disk movement. So, you don't need the >> > additional >> > > > >> > > BrokerDirStateUpdateRequest. Then the controller never needs >> to >> > > > issue >> > > > >> > > ChangeReplicaDirRequest. >> > > > >> > > Only the admin tool will issue ChangeReplicaDirRequest to >> move >> > > data >> > > > >> > within >> > > > >> > > a broker. I agree that this makes LeaderAndIsrRequest more >> > > > >> complicated, >> > > > >> > but >> > > > >> > > that seems simpler than changing the controller to track >> > > additional >> > > > >> > states >> > > > >> > > during partition reassignment. >> > > > >> > > >> > > > >> > > 4. We want to make a decision on how to expose the stats. So >> > far, >> > > we >> > > > >> are >> > > > >> > > exposing stats like the individual log size as JMX. So, one >> way >> > is >> > > > to >> > > > >> > just >> > > > >> > > add new jmx to expose the log directory of individual >> replicas. >> > > > >> > > >> > > > >> > > Thanks, >> > > > >> > > >> > > > >> > > Jun >> > > > >> > > >> > > > >> > > >> > > > >> > > On Thu, Mar 2, 2017 at 11:18 PM, Dong Lin < >> lindon...@gmail.com> >> > > > >> wrote: >> > > > >> > > >> > > > >> > > > Hey Jun, >> > > > >> > > > >> > > > >> > > > Thanks for all the comments! Please see my answer below. I >> > have >> > > > >> updated >> > > > >> > > the >> > > > >> > > > KIP to address most of the questions and make the KIP >> easier >> > to >> > > > >> > > understand. >> > > > >> > > > >> > > > >> > > > Thanks, >> > > > >> > > > Dong >> > > > >> > > > >> > > > >> > > > On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao <j...@confluent.io> >> > > wrote: >> > > > >> > > > >> > > > >> > > > > Hi, Dong, >> > > > >> > > > > >> > > > >> > > > > Thanks for the KIP. A few comments below. >> > > > >> > > > > >> > > > >> > > > > 1. For moving data across directories >> > > > >> > > > > 1.1 I am not sure why we want to use >> ReplicaFetcherThread to >> > > > move >> > > > >> > data >> > > > >> > > > > around in the leader. ReplicaFetchThread fetches data >> from >> > > > socket. >> > > > >> > For >> > > > >> > > > > moving data locally, it seems that we want to avoid the >> > socket >> > > > >> > > overhead. >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > The purpose of using ReplicaFetchThread is to re-use >> existing >> > > > thread >> > > > >> > > > instead of creating more threads and make our thread model >> > more >> > > > >> > complex. >> > > > >> > > It >> > > > >> > > > seems like a nature choice for copying data between disks >> > since >> > > it >> > > > >> is >> > > > >> > > > similar to copying data between brokers. Another reason is >> > that >> > > if >> > > > >> the >> > > > >> > > > replica to be moved is a follower, we don't need lock to >> swap >> > > > >> replicas >> > > > >> > > when >> > > > >> > > > destination replica has caught up, since the same thread >> which >> > > is >> > > > >> > > fetching >> > > > >> > > > data from leader will swap the replica. >> > > > >> > > > >> > > > >> > > > The ReplicaFetchThread will not incur socket overhead while >> > > > copying >> > > > >> > data >> > > > >> > > > between disks. It will read directly from source disk (as >> we >> > do >> > > > when >> > > > >> > > > processing FetchRequest) and write to destination disk (as >> we >> > do >> > > > >> when >> > > > >> > > > processing ProduceRequest). >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > 1.2 I am also not sure about moving data in the >> > > > >> ReplicaFetcherThread >> > > > >> > in >> > > > >> > > > the >> > > > >> > > > > follower. For example, I am not sure setting >> > > > >> replica.fetch.max.wait >> > > > >> > to >> > > > >> > > 0 >> > > > >> > > > > is ideal. It may not always be effective since a fetch >> > > request >> > > > in >> > > > >> > the >> > > > >> > > > > ReplicaFetcherThread could be arbitrarily delayed due to >> > > > >> replication >> > > > >> > > > > throttling on the leader. In general, the data movement >> > logic >> > > > >> across >> > > > >> > > > disks >> > > > >> > > > > seems different from that in ReplicaFetcherThread. So, I >> am >> > > not >> > > > >> sure >> > > > >> > > why >> > > > >> > > > > they need to be coupled. >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > While it may not be the most efficient way to copy data >> > between >> > > > >> local >> > > > >> > > > disks, it will be at least as efficient as copying data >> from >> > > > leader >> > > > >> to >> > > > >> > > the >> > > > >> > > > destination disk. The expected goal of KIP-113 is to enable >> > data >> > > > >> > movement >> > > > >> > > > between disks with no less efficiency than what we do now >> when >> > > > >> moving >> > > > >> > > data >> > > > >> > > > between brokers. I think we can optimize its performance >> using >> > > > >> separate >> > > > >> > > > thread if the performance is not good enough. >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > 1.3 Could you add a bit more details on how we swap the >> > > replicas >> > > > >> when >> > > > >> > > the >> > > > >> > > > > new ones are fully caught up? For example, what happens >> when >> > > the >> > > > >> new >> > > > >> > > > > replica in the new log directory is caught up, but when >> we >> > > want >> > > > >> to do >> > > > >> > > the >> > > > >> > > > > swap, some new data has arrived? >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > If the replica is a leader, then ReplicaFetcherThread will >> > > perform >> > > > >> the >> > > > >> > > > replacement. Proper lock is needed to prevent >> > > KafkaRequestHandler >> > > > >> from >> > > > >> > > > appending data to the topicPartition.log on the source >> disks >> > > > before >> > > > >> > this >> > > > >> > > > replacement is completed by ReplicaFetcherThread. >> > > > >> > > > >> > > > >> > > > If the replica is a follower, because the same >> > > ReplicaFetchThread >> > > > >> which >> > > > >> > > > fetches data from leader will also swap the replica , no >> lock >> > is >> > > > >> > needed. >> > > > >> > > > >> > > > >> > > > I have updated the KIP to specify both more explicitly. >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > 1.4 Do we need to do the .move at the log segment level >> or >> > > could >> > > > >> we >> > > > >> > > just >> > > > >> > > > do >> > > > >> > > > > that at the replica directory level? Renaming just a >> > directory >> > > > is >> > > > >> > much >> > > > >> > > > > faster than renaming the log segments. >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > Great point. I have updated the KIP to rename the log >> > directory >> > > > >> > instead. >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > 1.5 Could you also describe a bit what happens when >> either >> > the >> > > > >> source >> > > > >> > > or >> > > > >> > > > > the target log directory fails while the data moving is >> in >> > > > >> progress? >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > If source log directory fails, then the replica movement >> will >> > > stop >> > > > >> and >> > > > >> > > the >> > > > >> > > > source replica is marked offline. If destination log >> directory >> > > > >> fails, >> > > > >> > > then >> > > > >> > > > the replica movement will stop. I have updated the KIP to >> > > clarify >> > > > >> this. >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > >> > > > >> > > > > 2. For partition reassignment. >> > > > >> > > > > 2.1 I am not sure if the controller can block on >> > > > >> > > ChangeReplicaDirRequest. >> > > > >> > > > > Data movement may take a long time to complete. If there >> is >> > an >> > > > >> > > > outstanding >> > > > >> > > > > request from the controller to a broker, that broker >> won't >> > be >> > > > >> able to >> > > > >> > > > > process any new request from the controller. So if >> another >> > > event >> > > > >> > (e.g. >> > > > >> > > > > broker failure) happens when the data movement is in >> > progress, >> > > > >> > > subsequent >> > > > >> > > > > LeaderAnIsrRequest will be delayed. >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > Yeah good point. I missed the fact that there is be only >> one >> > > > >> inflight >> > > > >> > > > request from controller to broker. >> > > > >> > > > >> > > > >> > > > How about I add a request, e.g. >> BrokerDirStateUpdateRequest, >> > > which >> > > > >> maps >> > > > >> > > > topicPartition to log directory and can be sent from >> broker to >> > > > >> > controller >> > > > >> > > > to indicate completion? >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > 2.2 in the KIP, the partition reassignment tool is also >> used >> > > for >> > > > >> > cases >> > > > >> > > > > where an admin just wants to balance the existing data >> > across >> > > > log >> > > > >> > > > > directories in the broker. In this case, it seems that >> it's >> > > over >> > > > >> > > killing >> > > > >> > > > to >> > > > >> > > > > have the process go through the controller. A simpler >> > approach >> > > > is >> > > > >> to >> > > > >> > > > issue >> > > > >> > > > > an RPC request to the broker directly. >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > I agree we can optimize this case. It is just that we have >> to >> > > add >> > > > >> new >> > > > >> > > logic >> > > > >> > > > or code path to handle a scenario that is already covered >> by >> > the >> > > > >> more >> > > > >> > > > complicated scenario. I will add it to the KIP. >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > 2.3 When using the partition reassignment tool to move >> > > replicas >> > > > >> > across >> > > > >> > > > > brokers, it make sense to be able to specify the log >> > directory >> > > > of >> > > > >> the >> > > > >> > > > newly >> > > > >> > > > > created replicas. The KIP does that in two separate >> requests >> > > > >> > > > > ChangeReplicaDirRequest and LeaderAndIsrRequest, and >> tracks >> > > the >> > > > >> > > progress >> > > > >> > > > of >> > > > >> > > > > each independently. An alternative is to do that just in >> > > > >> > > > > LeaderAndIsrRequest. >> > > > >> > > > > That way, the new replicas will be created in the right >> log >> > > dir >> > > > in >> > > > >> > the >> > > > >> > > > > first place and the controller just needs to track the >> > > progress >> > > > of >> > > > >> > > > > partition reassignment in the current way. >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > I agree it is better to use one request instead of two to >> > > request >> > > > >> > replica >> > > > >> > > > movement between disks. But I think the performance >> advantage >> > of >> > > > >> doing >> > > > >> > so >> > > > >> > > > is negligible because we trigger replica assignment much >> less >> > > than >> > > > >> all >> > > > >> > > > other kinds of events in the Kafka cluster. I am not sure >> that >> > > the >> > > > >> > > benefit >> > > > >> > > > of doing this is worth the effort to add an optional string >> > > field >> > > > in >> > > > >> > the >> > > > >> > > > LeaderAndIsrRequest. Also if we add this optional field in >> the >> > > > >> > > > LeaderAndIsrRequest, we probably want to remove >> > > > >> ChangeReplicaDirRequest >> > > > >> > > to >> > > > >> > > > avoid having two requests doing the same thing. But it >> means >> > > user >> > > > >> > script >> > > > >> > > > can not send request directly to the broker to trigger >> replica >> > > > >> movement >> > > > >> > > > between log directories. >> > > > >> > > > >> > > > >> > > > I will do it if you are strong about this optimzation. >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > >> > > > >> > > > > 3. /admin/reassign_partitions: Including the log dir in >> > every >> > > > >> replica >> > > > >> > > may >> > > > >> > > > > not be efficient. We could include a list of log >> directories >> > > and >> > > > >> > > > reference >> > > > >> > > > > the index of the log directory in each replica. >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > Good point. I have updated the KIP to use this solution. >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > >> > > > >> > > > > 4. DescribeDirsRequest: The stats in the request are >> already >> > > > >> > available >> > > > >> > > > from >> > > > >> > > > > JMX. Do we need the new request? >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > Does JMX also include the state (i.e. offline or online) of >> > each >> > > > log >> > > > >> > > > directory and the log directory of each replica? If not, >> then >> > > > maybe >> > > > >> we >> > > > >> > > > still need DescribeDirsRequest? >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > >> > > > >> > > > > 5. We want to be consistent on ChangeReplicaDirRequest vs >> > > > >> > > > > ChangeReplicaRequest. >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > I think ChangeReplicaRequest and ChangeReplicaResponse is >> my >> > > typo. >> > > > >> > Sorry, >> > > > >> > > > they are fixed now. >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > >> > > > >> > > > > Thanks, >> > > > >> > > > > >> > > > >> > > > > Jun >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > On Fri, Feb 3, 2017 at 6:19 PM, Dong Lin < >> > lindon...@gmail.com >> > > > >> > > > >> > wrote: >> > > > >> > > > > >> > > > >> > > > > > Hey ALexey, >> > > > >> > > > > > >> > > > >> > > > > > Thanks for all the comments! >> > > > >> > > > > > >> > > > >> > > > > > I have updated the KIP to specify how we enforce >> quota. I >> > > also >> > > > >> > > updated >> > > > >> > > > > the >> > > > >> > > > > > "The thread model and broker logic for moving replica >> data >> > > > >> between >> > > > >> > > log >> > > > >> > > > > > directories" to make it easier to read. You can find >> the >> > > exact >> > > > >> > change >> > > > >> > > > > here >> > > > >> > > > > > <https://cwiki.apache.org/conf >> > > luence/pages/diffpagesbyversio >> > > > >> > > > > > n.action?pageId=67638408&selec >> > > tedPageVersions=5&selectedPage >> > > > >> > > > Versions=6>. >> > > > >> > > > > > The idea is to use the same replication quota mechanism >> > > > >> introduced >> > > > >> > in >> > > > >> > > > > > KIP-73. >> > > > >> > > > > > >> > > > >> > > > > > Thanks, >> > > > >> > > > > > Dong >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > On Wed, Feb 1, 2017 at 2:16 AM, Alexey Ozeritsky < >> > > > >> > > aozerit...@yandex.ru >> > > > >> > > > > >> > > > >> > > > > > wrote: >> > > > >> > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > 24.01.2017, 22:03, "Dong Lin" <lindon...@gmail.com>: >> > > > >> > > > > > > > Hey Alexey, >> > > > >> > > > > > > > >> > > > >> > > > > > > > Thanks. I think we agreed that the suggested >> solution >> > > > >> doesn't >> > > > >> > > work >> > > > >> > > > in >> > > > >> > > > > > > > general for kafka users. To answer your questions: >> > > > >> > > > > > > > >> > > > >> > > > > > > > 1. I agree we need quota to rate limit replica >> > movement >> > > > >> when a >> > > > >> > > > broker >> > > > >> > > > > > is >> > > > >> > > > > > > > moving a "leader" replica. I will come up with >> > solution, >> > > > >> > probably >> > > > >> > > > > > re-use >> > > > >> > > > > > > > the config of replication quota introduced in >> KIP-73. >> > > > >> > > > > > > > >> > > > >> > > > > > > > 2. Good point. I agree that this is a problem in >> > > general. >> > > > >> If is >> > > > >> > > no >> > > > >> > > > > new >> > > > >> > > > > > > data >> > > > >> > > > > > > > on that broker, with current default value of >> > > > >> > > > > > replica.fetch.wait.max.ms >> > > > >> > > > > > > > and replica.fetch.max.bytes, the replica will be >> moved >> > > at >> > > > >> only >> > > > >> > 2 >> > > > >> > > > MBps >> > > > >> > > > > > > > throughput. I think the solution is for broker to >> set >> > > > >> > > > > > > > replica.fetch.wait.max.ms to 0 in its >> FetchRequest if >> > > the >> > > > >> > > > > > corresponding >> > > > >> > > > > > > > ReplicaFetcherThread needs to move some replica to >> > > another >> > > > >> > disk. >> > > > >> > > > > > > > >> > > > >> > > > > > > > 3. I have updated the KIP to mention that the read >> > size >> > > > of a >> > > > >> > > given >> > > > >> > > > > > > > partition is configured using >> replica.fetch.max.bytes >> > > when >> > > > >> we >> > > > >> > > move >> > > > >> > > > > > > replicas >> > > > >> > > > > > > > between disks. >> > > > >> > > > > > > > >> > > > >> > > > > > > > Please see this >> > > > >> > > > > > > > <https://cwiki.apache.org/conf >> > > > >> luence/pages/diffpagesbyversio >> > > > >> > > > n.action >> > > > >> > > > > ? >> > > > >> > > > > > > pageId=67638408&selectedPageVe >> > > > rsions=4&selectedPageVersions= >> > > > >> 5> >> > > > >> > > > > > > > for the change of the KIP. I will come up with a >> > > solution >> > > > to >> > > > >> > > > throttle >> > > > >> > > > > > > > replica movement when a broker is moving a "leader" >> > > > replica. >> > > > >> > > > > > > >> > > > >> > > > > > > Thanks. It looks great. >> > > > >> > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > > On Tue, Jan 24, 2017 at 3:30 AM, Alexey Ozeritsky < >> > > > >> > > > > > aozerit...@yandex.ru> >> > > > >> > > > > > > > wrote: >> > > > >> > > > > > > > >> > > > >> > > > > > > >> 23.01.2017, 22:11, "Dong Lin" < >> lindon...@gmail.com >> > >: >> > > > >> > > > > > > >> > Thanks. Please see my comment inline. >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> > On Mon, Jan 23, 2017 at 6:45 AM, Alexey >> Ozeritsky >> > < >> > > > >> > > > > > > aozerit...@yandex.ru> >> > > > >> > > > > > > >> > wrote: >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> >> 13.01.2017, 22:29, "Dong Lin" < >> > lindon...@gmail.com >> > > >: >> > > > >> > > > > > > >> >> > Hey Alexey, >> > > > >> > > > > > > >> >> > >> > > > >> > > > > > > >> >> > Thanks for your review and the alternative >> > > > approach. >> > > > >> > Here >> > > > >> > > is >> > > > >> > > > > my >> > > > >> > > > > > > >> >> > understanding of your patch. kafka's >> background >> > > > >> threads >> > > > >> > > are >> > > > >> > > > > used >> > > > >> > > > > > > to >> > > > >> > > > > > > >> move >> > > > >> > > > > > > >> >> > data between replicas. When data movement is >> > > > >> triggered, >> > > > >> > > the >> > > > >> > > > > log >> > > > >> > > > > > > will >> > > > >> > > > > > > >> be >> > > > >> > > > > > > >> >> > rolled and the new logs will be put in the >> new >> > > > >> > directory, >> > > > >> > > > and >> > > > >> > > > > > > >> background >> > > > >> > > > > > > >> >> > threads will move segment from old >> directory to >> > > new >> > > > >> > > > directory. >> > > > >> > > > > > > >> >> > >> > > > >> > > > > > > >> >> > It is important to note that KIP-112 is >> > intended >> > > to >> > > > >> work >> > > > >> > > > with >> > > > >> > > > > > > >> KIP-113 to >> > > > >> > > > > > > >> >> > support JBOD. I think your solution is >> > definitely >> > > > >> > simpler >> > > > >> > > > and >> > > > >> > > > > > > better >> > > > >> > > > > > > >> >> under >> > > > >> > > > > > > >> >> > the current kafka implementation that a >> broker >> > > will >> > > > >> fail >> > > > >> > > if >> > > > >> > > > > any >> > > > >> > > > > > > disk >> > > > >> > > > > > > >> >> fails. >> > > > >> > > > > > > >> >> > But I am not sure if we want to allow >> broker to >> > > run >> > > > >> with >> > > > >> > > > > partial >> > > > >> > > > > > > >> disks >> > > > >> > > > > > > >> >> > failure. Let's say the a replica is being >> moved >> > > > from >> > > > >> > > > > log_dir_old >> > > > >> > > > > > > to >> > > > >> > > > > > > >> >> > log_dir_new and then log_dir_old stops >> working >> > > due >> > > > to >> > > > >> > disk >> > > > >> > > > > > > failure. >> > > > >> > > > > > > >> How >> > > > >> > > > > > > >> >> > would your existing patch handles it? To >> make >> > the >> > > > >> > > scenario a >> > > > >> > > > > bit >> > > > >> > > > > > > more >> > > > >> > > > > > > >> >> >> > > > >> > > > > > > >> >> We will lose log_dir_old. After broker >> restart we >> > > can >> > > > >> read >> > > > >> > > the >> > > > >> > > > > > data >> > > > >> > > > > > > >> from >> > > > >> > > > > > > >> >> log_dir_new. >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> > No, you probably can't. This is because the >> broker >> > > > >> doesn't >> > > > >> > > have >> > > > >> > > > > > > *all* the >> > > > >> > > > > > > >> > data for this partition. For example, say the >> > broker >> > > > has >> > > > >> > > > > > > >> > partition_segement_1, partition_segment_50 and >> > > > >> > > > > > partition_segment_100 >> > > > >> > > > > > > on >> > > > >> > > > > > > >> the >> > > > >> > > > > > > >> > log_dir_old. partition_segment_100, which has >> the >> > > > latest >> > > > >> > > data, >> > > > >> > > > > has >> > > > >> > > > > > > been >> > > > >> > > > > > > >> > moved to log_dir_new, and the log_dir_old fails >> > > before >> > > > >> > > > > > > >> partition_segment_50 >> > > > >> > > > > > > >> > and partition_segment_1 is moved to >> log_dir_new. >> > > When >> > > > >> > broker >> > > > >> > > > > > > re-starts, >> > > > >> > > > > > > >> it >> > > > >> > > > > > > >> > won't have partition_segment_50. This causes >> > problem >> > > > if >> > > > >> > > broker >> > > > >> > > > is >> > > > >> > > > > > > elected >> > > > >> > > > > > > >> > leader and consumer wants to consume data in >> the >> > > > >> > > > > > partition_segment_1. >> > > > >> > > > > > > >> >> > > > >> > > > > > > >> Right. >> > > > >> > > > > > > >> >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> >> > complicated, let's say the broker is >> shtudown, >> > > > >> > > log_dir_old's >> > > > >> > > > > > disk >> > > > >> > > > > > > >> fails, >> > > > >> > > > > > > >> >> > and the broker starts. In this case broker >> > > doesn't >> > > > >> even >> > > > >> > > know >> > > > >> > > > > if >> > > > >> > > > > > > >> >> log_dir_new >> > > > >> > > > > > > >> >> > has all the data needed for this replica. It >> > > > becomes >> > > > >> a >> > > > >> > > > problem >> > > > >> > > > > > if >> > > > >> > > > > > > the >> > > > >> > > > > > > >> >> > broker is elected leader of this partition >> in >> > > this >> > > > >> case. >> > > > >> > > > > > > >> >> >> > > > >> > > > > > > >> >> log_dir_new contains the most recent data so >> we >> > > will >> > > > >> lose >> > > > >> > > the >> > > > >> > > > > tail >> > > > >> > > > > > > of >> > > > >> > > > > > > >> >> partition. >> > > > >> > > > > > > >> >> This is not a big problem for us because we >> > already >> > > > >> delete >> > > > >> > > > tails >> > > > >> > > > > > by >> > > > >> > > > > > > >> hand >> > > > >> > > > > > > >> >> (see https://issues.apache.org/jira >> > > > /browse/KAFKA-1712 >> > > > >> ). >> > > > >> > > > > > > >> >> Also we dont use authomatic leader balancing >> > > > >> > > > > > > >> (auto.leader.rebalance.enable=false), >> > > > >> > > > > > > >> >> so this partition becomes the leader with a >> low >> > > > >> > probability. >> > > > >> > > > > > > >> >> I think my patch can be modified to prohibit >> the >> > > > >> selection >> > > > >> > > of >> > > > >> > > > > the >> > > > >> > > > > > > >> leader >> > > > >> > > > > > > >> >> until the partition does not move completely. >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> > I guess you are saying that you have deleted >> the >> > > tails >> > > > >> by >> > > > >> > > hand >> > > > >> > > > in >> > > > >> > > > > > > your >> > > > >> > > > > > > >> own >> > > > >> > > > > > > >> > kafka branch. But KAFKA-1712 is not accepted >> into >> > > > Kafka >> > > > >> > trunk >> > > > >> > > > > and I >> > > > >> > > > > > > am >> > > > >> > > > > > > >> not >> > > > >> > > > > > > >> >> > > > >> > > > > > > >> No. We just modify segments mtime by cron job. >> This >> > > > works >> > > > >> > with >> > > > >> > > > > > vanilla >> > > > >> > > > > > > >> kafka. >> > > > >> > > > > > > >> >> > > > >> > > > > > > >> > sure if it is the right solution. How would >> this >> > > > >> solution >> > > > >> > > > address >> > > > >> > > > > > the >> > > > >> > > > > > > >> > problem mentioned above? >> > > > >> > > > > > > >> >> > > > >> > > > > > > >> If you need only fresh data and if you remove old >> > data >> > > > by >> > > > >> > hands >> > > > >> > > > > this >> > > > >> > > > > > is >> > > > >> > > > > > > >> not a problem. But in general case >> > > > >> > > > > > > >> this is a problem of course. >> > > > >> > > > > > > >> >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> > BTW, I am not sure the solution mentioned in >> > > > KAFKA-1712 >> > > > >> is >> > > > >> > > the >> > > > >> > > > > > right >> > > > >> > > > > > > way >> > > > >> > > > > > > >> to >> > > > >> > > > > > > >> > address its problem. Now that we have >> timestamp in >> > > the >> > > > >> > > message >> > > > >> > > > we >> > > > >> > > > > > > can use >> > > > >> > > > > > > >> > that to delete old segement instead of relying >> on >> > > the >> > > > >> log >> > > > >> > > > segment >> > > > >> > > > > > > mtime. >> > > > >> > > > > > > >> > Just some idea and we don't have to discuss >> this >> > > > problem >> > > > >> > > here. >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> >> > >> > > > >> > > > > > > >> >> > The solution presented in the KIP attempts >> to >> > > > handle >> > > > >> it >> > > > >> > by >> > > > >> > > > > > > replacing >> > > > >> > > > > > > >> >> > replica in an atomic version fashion after >> the >> > > log >> > > > in >> > > > >> > the >> > > > >> > > > new >> > > > >> > > > > > dir >> > > > >> > > > > > > has >> > > > >> > > > > > > >> >> fully >> > > > >> > > > > > > >> >> > caught up with the log in the old dir. At at >> > time >> > > > the >> > > > >> > log >> > > > >> > > > can >> > > > >> > > > > be >> > > > >> > > > > > > >> >> considered >> > > > >> > > > > > > >> >> > to exist on only one log directory. >> > > > >> > > > > > > >> >> >> > > > >> > > > > > > >> >> As I understand your solution does not cover >> > > quotas. >> > > > >> > > > > > > >> >> What happens if someone starts to transfer 100 >> > > > >> partitions >> > > > >> > ? >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> > Good point. Quota can be implemented in the >> > future. >> > > It >> > > > >> is >> > > > >> > > > > currently >> > > > >> > > > > > > >> > mentioned as as a potential future improvement >> in >> > > > >> KIP-112 >> > > > >> > > > > > > >> > <https://cwiki.apache.org/conf >> > > > luence/display/KAFKA/KIP- >> > > > >> > 112%3 >> > > > >> > > > > > > >> A+Handle+disk+failure+for+JBOD>.Thanks >> > > > >> > > > > > > >> > for the reminder. I will move it to KIP-113. >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> >> > If yes, it will read a ByteBufferMessageSet >> > from >> > > > >> > > > > > > topicPartition.log >> > > > >> > > > > > > >> and >> > > > >> > > > > > > >> >> append the message set to topicPartition.move >> > > > >> > > > > > > >> >> >> > > > >> > > > > > > >> >> i.e. processPartitionData will read data from >> the >> > > > >> > beginning >> > > > >> > > of >> > > > >> > > > > > > >> >> topicPartition.log? What is the read size? >> > > > >> > > > > > > >> >> A ReplicaFetchThread reads many partitions so >> if >> > > one >> > > > >> does >> > > > >> > > some >> > > > >> > > > > > > >> complicated >> > > > >> > > > > > > >> >> work (= read a lot of data from disk) >> everything >> > > will >> > > > >> slow >> > > > >> > > > down. >> > > > >> > > > > > > >> >> I think read size should not be very big. >> > > > >> > > > > > > >> >> >> > > > >> > > > > > > >> >> On the other hand at this point >> > > > (processPartitionData) >> > > > >> one >> > > > >> > > can >> > > > >> > > > > use >> > > > >> > > > > > > only >> > > > >> > > > > > > >> >> the new data (ByteBufferMessageSet from >> > parameters) >> > > > and >> > > > >> > wait >> > > > >> > > > > until >> > > > >> > > > > > > >> >> (topicPartition.move.smallestOffset <= >> > > > >> > > > > > > topicPartition.log.smallestOff >> > > > >> > > > > > > >> set >> > > > >> > > > > > > >> >> && topicPartition.log.largestOffset == >> > > > >> > > > > > > topicPartition.log.largestOffs >> > > > >> > > > > > > >> et). >> > > > >> > > > > > > >> >> In this case the write speed to >> > topicPartition.move >> > > > and >> > > > >> > > > > > > >> topicPartition.log >> > > > >> > > > > > > >> >> will be the same so this will allow us to move >> > many >> > > > >> > > partitions >> > > > >> > > > > to >> > > > >> > > > > > > one >> > > > >> > > > > > > >> disk. >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> > The read size of a given partition is >> configured >> > > > >> > > > > > > >> > using replica.fetch.max.bytes, which is the >> same >> > > size >> > > > >> used >> > > > >> > by >> > > > >> > > > > > > >> FetchRequest >> > > > >> > > > > > > >> > from follower to leader. If the broker is >> moving a >> > > > >> replica >> > > > >> > > for >> > > > >> > > > > > which >> > > > >> > > > > > > it >> > > > >> > > > > > > >> >> > > > >> > > > > > > >> OK. Could you mention it in KIP? >> > > > >> > > > > > > >> >> > > > >> > > > > > > >> > acts as a follower, the disk write rate for >> moving >> > > > this >> > > > >> > > replica >> > > > >> > > > > is >> > > > >> > > > > > at >> > > > >> > > > > > > >> most >> > > > >> > > > > > > >> > the rate it fetches from leader (assume it is >> > > catching >> > > > >> up >> > > > >> > and >> > > > >> > > > has >> > > > >> > > > > > > >> > sufficient data to read from leader, which is >> > > subject >> > > > to >> > > > >> > > > > > > round-trip-time >> > > > >> > > > > > > >> > between itself and the leader. Thus this part >> if >> > > > >> probably >> > > > >> > > fine >> > > > >> > > > > even >> > > > >> > > > > > > >> without >> > > > >> > > > > > > >> > quota. >> > > > >> > > > > > > >> >> > > > >> > > > > > > >> I think there are 2 problems >> > > > >> > > > > > > >> 1. Without speed limiter this will not work good >> > even >> > > > for >> > > > >> 1 >> > > > >> > > > > > partition. >> > > > >> > > > > > > In >> > > > >> > > > > > > >> our production we had a problem so we did the >> > throuput >> > > > >> > limiter: >> > > > >> > > > > > > >> https://github.com/resetius/ka >> > > > >> fka/commit/cda31dadb2f135743bf >> > > > >> > > > > > > >> 41083062927886c5ddce1#diff-ffa >> > > > >> 8861e850121997a534ebdde2929c6R >> > > > >> > > 713 >> > > > >> > > > > > > >> >> > > > >> > > > > > > >> 2. I dont understand how it will work in case of >> big >> > > > >> > > > > > > >> replica.fetch.wait.max.ms and partition with >> > > irregular >> > > > >> flow. >> > > > >> > > > > > > >> For example someone could have >> > > > replica.fetch.wait.max.ms >> > > > >> > =10mi >> > > > >> > > > nutes >> > > > >> > > > > > and >> > > > >> > > > > > > >> partition that has very high data flow from >> 12:00 to >> > > > 13:00 >> > > > >> > and >> > > > >> > > > zero >> > > > >> > > > > > > flow >> > > > >> > > > > > > >> otherwise. >> > > > >> > > > > > > >> In this case processPartitionData could be called >> > once >> > > > per >> > > > >> > > > > 10minutes >> > > > >> > > > > > > so if >> > > > >> > > > > > > >> we start data moving in 13:01 it will be finished >> > next >> > > > >> day. >> > > > >> > > > > > > >> >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> > But ff the broker is moving a replica for >> which it >> > > > acts >> > > > >> as >> > > > >> > a >> > > > >> > > > > > leader, >> > > > >> > > > > > > as >> > > > >> > > > > > > >> of >> > > > >> > > > > > > >> > current KIP the broker will keep reading from >> > > > >> log_dir_old >> > > > >> > and >> > > > >> > > > > > append >> > > > >> > > > > > > to >> > > > >> > > > > > > >> > log_dir_new without having to wait for >> > > > round-trip-time. >> > > > >> We >> > > > >> > > > > probably >> > > > >> > > > > > > need >> > > > >> > > > > > > >> > quota for this in the future. >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> >> > >> > > > >> > > > > > > >> >> > And to answer your question, yes >> > > topicpartition.log >> > > > >> > refers >> > > > >> > > > to >> > > > >> > > > > > > >> >> > topic-paritition/segment.log. >> > > > >> > > > > > > >> >> > >> > > > >> > > > > > > >> >> > Thanks, >> > > > >> > > > > > > >> >> > Dong >> > > > >> > > > > > > >> >> > >> > > > >> > > > > > > >> >> > On Fri, Jan 13, 2017 at 4:12 AM, Alexey >> > > Ozeritsky < >> > > > >> > > > > > > >> aozerit...@yandex.ru> >> > > > >> > > > > > > >> >> > wrote: >> > > > >> > > > > > > >> >> > >> > > > >> > > > > > > >> >> >> Hi, >> > > > >> > > > > > > >> >> >> >> > > > >> > > > > > > >> >> >> We have the similar solution that have been >> > > > working >> > > > >> in >> > > > >> > > > > > production >> > > > >> > > > > > > >> since >> > > > >> > > > > > > >> >> >> 2014. You can see it here: >> > > > >> > > https://github.com/resetius/ka >> > > > >> > > > > > > >> >> >> fka/commit/20658593e246d218490 >> > > > 6879defa2e763c4d413fb >> > > > >> > > > > > > >> >> >> The idea is very simple >> > > > >> > > > > > > >> >> >> 1. Disk balancer runs in a separate thread >> > > inside >> > > > >> > > scheduler >> > > > >> > > > > > pool. >> > > > >> > > > > > > >> >> >> 2. It does not touch empty partitions >> > > > >> > > > > > > >> >> >> 3. Before it moves a partition it forcibly >> > > creates >> > > > >> new >> > > > >> > > > > segment >> > > > >> > > > > > > on a >> > > > >> > > > > > > >> >> >> destination disk >> > > > >> > > > > > > >> >> >> 4. It moves segment by segment from new to >> > old. >> > > > >> > > > > > > >> >> >> 5. Log class works with segments on both >> disks >> > > > >> > > > > > > >> >> >> >> > > > >> > > > > > > >> >> >> Your approach seems too complicated, >> moreover >> > it >> > > > >> means >> > > > >> > > that >> > > > >> > > > > you >> > > > >> > > > > > > >> have to >> > > > >> > > > > > > >> >> >> patch different components of the system >> > > > >> > > > > > > >> >> >> Could you clarify what do you mean by >> > > > >> > topicPartition.log? >> > > > >> > > > Is >> > > > >> > > > > it >> > > > >> > > > > > > >> >> >> topic-paritition/segment.log ? >> > > > >> > > > > > > >> >> >> >> > > > >> > > > > > > >> >> >> 12.01.2017, 21:47, "Dong Lin" < >> > > > lindon...@gmail.com >> > > > >> >: >> > > > >> > > > > > > >> >> >> > Hi all, >> > > > >> > > > > > > >> >> >> > >> > > > >> > > > > > > >> >> >> > We created KIP-113: Support replicas >> > movement >> > > > >> between >> > > > >> > > log >> > > > >> > > > > > > >> >> directories. >> > > > >> > > > > > > >> >> >> > Please find the KIP wiki in the link >> > > > >> > > > > > > >> >> >> > *https://cwiki.apache.org/conf >> > > > >> > > > > luence/display/KAFKA/KIP-113% >> > > > >> > > > > > > >> >> >> 3A+Support+replicas+movement+b >> > > > >> etween+log+directories >> > > > >> > > > > > > >> >> >> > <https://cwiki.apache.org/conf >> > > > >> > > > > luence/display/KAFKA/KIP-113% >> > > > >> > > > > > > >> >> >> 3A+Support+replicas+movement+ >> > > > >> > between+log+directories>.* >> > > > >> > > > > > > >> >> >> > >> > > > >> > > > > > > >> >> >> > This KIP is related to KIP-112 >> > > > >> > > > > > > >> >> >> > <https://cwiki.apache.org/conf >> > > > >> > > > > luence/display/KAFKA/KIP-112% >> > > > >> > > > > > > >> >> >> 3A+Handle+disk+failure+for+JBOD>: >> > > > >> > > > > > > >> >> >> > Handle disk failure for JBOD. They are >> > needed >> > > in >> > > > >> > order >> > > > >> > > to >> > > > >> > > > > > > support >> > > > >> > > > > > > >> >> JBOD in >> > > > >> > > > > > > >> >> >> > Kafka. Please help review the KIP. You >> > > feedback >> > > > is >> > > > >> > > > > > appreciated! >> > > > >> > > > > > > >> >> >> > >> > > > >> > > > > > > >> >> >> > Thanks, >> > > > >> > > > > > > >> >> >> > Dong >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > > >> > > > > >> > > > >> > > >> > >> > >