Hi, Dong, Thanks for the update. A few replies inlined below.
On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > Thanks for your comment! Please see my reply below. > > On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao <j...@confluent.io> wrote: > > > Hi, Dong, > > > > Thanks for the reply. > > > > 10. Could you comment on that? > > > > Sorry, I missed that comment. > > Good point. I think the log segments in topicPartition.move directory will > be subject to log truncation, log retention and log cleaning in the same > way as the log segments in the source log directory. I just specified this > inthe KIP. > > This is ok, but doubles the overhead of log cleaning. We probably want to think a bit more on this. > > > > > 11.2 "I am concerned that the ChangeReplicaDirRequest would be lost if > > broker > > restarts after it sends ChangeReplicaDirResponse but before it receives > > LeaderAndIsrRequest." > > > > In that case, the reassignment tool could detect that through > > DescribeDirsRequest > > and issue ChangeReplicaDirRequest again, right? In the common case, this > is > > probably not needed and we only need to write each replica once. > > > > My main concern with the approach in the current KIP is that once a new > > replica is created in the wrong log dir, the cross log directory movement > > may not catch up until the new replica is fully bootstrapped. So, we end > up > > writing the data for the same replica twice. > > > > I agree with your concern. My main concern is that it is a bit weird if > ChangeReplicaDirResponse can not guarantee success and the tool needs to > rely on DescribeDirResponse to see if it needs to send > ChangeReplicaDirRequest again. > > How about this: If broker doesn't not have already replica created for the > specified topicParition when it receives ChangeReplicaDirRequest, it will > reply ReplicaNotAvailableException AND remember (replica, destination log > directory) pair in memory to create the replica in the specified log > directory. > > I am not sure if returning ReplicaNotAvailableException is useful? What will the client do on receiving ReplicaNotAvailableException in this case? Perhaps we could just replace the is_temporary field in DescribeDirsRresponsePartition with a state field. We can use 0 to indicate the partition is created, 1 to indicate the partition is temporary and 2 to indicate that the partition is pending. > > > > 11.3 Are you saying the value in --throttle will be used to set both > > intra.broker.throttled.rate and leader.follower.replication. > > throttled.replicas? > > > > No. --throttle will be used to only to set leader.follower.replication as > it does now. I think we do not need any option in the > kafka-reassignment-partitions.sh to specify intra.broker.throttled.rate. > User canset it in broker config or dynamically using kafka-config.sh. Does > this sound OK? > > Ok. This sounds good. It would be useful to make this clear in the wiki. > > > > > 12.2 If the user only wants to check one topic, the tool could do the > > filtering on the client side, right? My concern with having both log_dirs > > and topics is the semantic. For example, if both are not empty, do we > > return the intersection or the union? > > > > Yes the tool could filter on the client side. But the purpose of having > this field is to reduce response side in case broker has a lot of topics. > The both fields are used as filter and the result is intersection. Do you > think this semantic is confusing or counter-intuitive? > Ok. Could we document the semantic when both dirs and topics are specified? Thanks, Jun > > > > > On Mon, Mar 13, 2017 at 3:32 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > Hey Jun, > > > > > > Thanks much for your detailed comments. Please see my reply below. > > > > > > On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao <j...@confluent.io> wrote: > > > > > > > Hi, Dong, > > > > > > > > Thanks for the updated KIP. Some more comments below. > > > > > > > > 10. For the .move log, do we perform any segment deletion (based on > > > > retention) or log cleaning (if a compacted topic)? Or do we only > enable > > > > that after the swap? > > > > > > > > 11. kafka-reassign-partitions.sh > > > > 11.1 If all reassigned replicas are in the current broker and only > the > > > log > > > > directories have changed, we can probably optimize the tool to not > > > trigger > > > > partition reassignment through the controller and only > > > > send ChangeReplicaDirRequest. > > > > > > > > > > Yes, the reassignment script should not create the reassignment znode > if > > no > > > replicas are not be moved between brokers. This falls into the "How to > > move > > > replica between log directories on the same broker" of the Proposed > > Change > > > section. > > > > > > > > > > 11.2 If ChangeReplicaDirRequest specifies a replica that's not > created > > > yet, > > > > could the broker just remember that in memory and create the replica > > when > > > > the creation is requested? This way, when doing cluster expansion, we > > can > > > > make sure that the new replicas on the new brokers are created in the > > > right > > > > log directory in the first place. We can also avoid the tool having > to > > > keep > > > > issuing ChangeReplicaDirRequest in response to > > > > ReplicaNotAvailableException. > > > > > > > > > > I am concerned that the ChangeReplicaDirRequest would be lost if broker > > > restarts after it sends ChangeReplicaDirResponse but before it receives > > > LeaderAndIsrRequest. In this case, the user will receive success when > > they > > > initiate replica reassignment, but replica reassignment will never > > complete > > > when they verify the reassignment later. This would be confusing to > user. > > > > > > There are three different approaches to this problem if broker has not > > > created replica yet after it receives ChangeReplicaDirResquest: > > > > > > 1) Broker immediately replies to user with ReplicaNotAvailableException > > and > > > user can decide to retry again later. The advantage of this solution is > > > that the broker logic is very simple and the reassignment script logic > > also > > > seems straightforward. The disadvantage is that user script has to > retry. > > > But it seems fine - we can set interval between retries to be 0.5 sec > so > > > that broker want be bombarded by those requests. This is the solution > > > chosen in the current KIP. > > > > > > 2) Broker can put ChangeReplicaDirRequest in a purgatory with timeout > and > > > replies to user after the replica has been created. I didn't choose > this > > in > > > the interest of keeping broker logic simpler. > > > > > > 3) Broker can remember that by making a mark in the disk, e.g. create > > > topicPartition.tomove directory in the destination log directory. This > > mark > > > will be persisted across broker restart. This is the first idea I had > > but I > > > replaced it with solution 1) in the interest of keeping broker simple. > > > > > > It seems that solution 1) is the simplest one that works. But I am OK > to > > > switch to the other two solutions if we don't want the retry logic. > What > > do > > > you think? > > > > > > > > > 11.3 Do we need an option in the tool to specify intra.broker. > > > > throttled.rate? > > > > > > > > > > I don't find it useful to add this option to > > kafka-reassign-partitions.sh. > > > The reason we have the option "--throttle" in the script to throttle > > > replication rate is that we usually want higher quota to fix an offline > > > replica to get out of URP. But we are OK to have a lower quota if we > are > > > moving replica only to balance the cluster. Thus it is common for SRE > to > > > use different quota when using kafka-reassign-partitions.sh to move > > replica > > > between brokers. > > > > > > However, the only reason for moving replica between log directories of > > the > > > same broker is to balance cluster resource. Thus the option to > > > specify intra.broker.throttled.rate in the tool is not that useful. I > am > > > inclined not to add this option to keep this tool's usage simpler. > > > > > > > > > > > > > > 12. DescribeDirsRequest > > > > 12.1 In other requests like CreateTopicRequest, we return an empty > list > > > in > > > > the response for an empty input list. If the input list is null, we > > > return > > > > everything. We should probably follow the same convention here. > > > > > > > > > > Thanks. I wasn't aware of this convention. I have change > > > DescribeDirsRequest so that "null" indicates "all". > > > > > > > > > > 12.2 Do we need the topics field? Since the request is about log > dirs, > > it > > > > makes sense to specify the log dirs. But it's weird to specify > topics. > > > > > > > > > > The topics field is not necessary. But it is useful to reduce the > > response > > > size in case user are only interested in the status of a few topics. > For > > > example, user may have initiated the reassignment of a given replica > from > > > one log directory to another log directory on the same broker, and the > > user > > > only wants to check the status of this given partition by looking > > > at DescribeDirsResponse. Thus this field is useful. > > > > > > I am not sure if it is weird to call this request DescribeDirsRequest. > > The > > > response is a map from log directory to information to some partitions > on > > > the log directory. Do you think we need to change the name of the > > request? > > > > > > > > > > 12.3 DescribeDirsResponsePartition: Should we include firstOffset and > > > > nextOffset in the response? That could be useful to track the > progress > > of > > > > the movement. > > > > > > > > > > Yeah good point. I agree it is useful to include logEndOffset in the > > > response. According to Log.scala doc the logEndOffset is equivalent to > > the > > > nextOffset. User can track progress by checking the difference between > > > logEndOffset of the given partition in the source and destination log > > > directories. I have added logEndOffset to the > > DescribeDirsResponsePartition > > > in the KIP. > > > > > > But it seems that we don't need firstOffset in the response. Do you > think > > > firstOffset is still needed? > > > > > > > > > > > > > > 13. ChangeReplicaDirResponse: Do we need error code at both levels? > > > > > > > > > > My bad. It is not needed. I have removed request level error code. I > also > > > added ChangeReplicaDirRequestTopic and ChangeReplicaDirResponseTopic to > > > reduce duplication of the "topic" string in the request and response. > > > > > > > > > > > > > > 14. num.replica.move.threads: Does it default to # log dirs? > > > > > > > > > > No. It doesn't. I expect default number to be set to a conservative > value > > > such as 3. It may be surprising to user if the number of threads > increase > > > just because they have assigned more log directories to Kafka broker. > > > > > > It seems that the number of replica move threads doesn't have to depend > > on > > > the number of log directories. It is possible to have one thread that > > moves > > > replicas across all log directories. On the other hand we can have > > multiple > > > threads to move replicas to the same log directory. For example, if > > broker > > > uses SSD, the CPU instead of disk IO may be the replica move bottleneck > > and > > > it will be faster to move replicas using multiple threads per log > > > directory. > > > > > > > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Thu, Mar 9, 2017 at 7:04 PM, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > > > I just made one correction in the KIP. If broker receives > > > > > ChangeReplicaDirRequest and the replica hasn't been created there, > > the > > > > > broker will respond ReplicaNotAvailableException. > > > > > The kafka-reassignemnt-partitions.sh will need to re-send > > > > > ChangeReplicaDirRequest in this case in order to wait for > controller > > to > > > > > send LeaderAndIsrRequest to broker. The previous approach of > creating > > > an > > > > > empty directory seems hacky. > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > Thanks for your comments! I have updated the KIP to address your > > > > > comments. > > > > > > Please see my reply inline. > > > > > > > > > > > > Can you let me know if the latest KIP has addressed your > comments? > > > > > > > > > > > > On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao <j...@confluent.io> > wrote: > > > > > > > > > > > >> Hi, Dong, > > > > > >> > > > > > >> Thanks for the reply. > > > > > >> > > > > > >> 1.3 So the thread gets the lock, checks if caught up and > releases > > > the > > > > > lock > > > > > >> if not? Then, in the case when there is continuous incoming > data, > > > the > > > > > >> thread may never get a chance to swap. One way to address this > is > > > when > > > > > the > > > > > >> thread is getting really close in catching up, just hold onto > the > > > lock > > > > > >> until the thread fully catches up. > > > > > >> > > > > > > > > > > > > Yes, that was my original solution. I see your point that the > lock > > > may > > > > > not > > > > > > be fairly assigned to ReplicaMoveThread and RequestHandlerThread > > when > > > > > there > > > > > > is frequent incoming requets. You solution should address the > > problem > > > > > and I > > > > > > have updated the KIP to use it. > > > > > > > > > > > > > > > > > >> > > > > > >> 2.3 So, you are saying that the partition reassignment tool can > > > first > > > > > send > > > > > >> a ChangeReplicaDirRequest to relevant brokers to establish the > log > > > dir > > > > > for > > > > > >> replicas not created yet, then trigger the partition movement > > across > > > > > >> brokers through the controller? That's actually a good idea. > Then, > > > we > > > > > can > > > > > >> just leave LeaderAndIsrRequest as it is. > > > > > > > > > > > > > > > > > > Yes, that is what I plan to do. If broker receives a > > > > > > ChangeReplicaDirRequest while it is not leader or follower of the > > > > > > partition, the broker will create an empty Log instance (i.e. a > > > > directory > > > > > > named topicPartition) in the destination log directory so that > the > > > > > replica > > > > > > will be placed there when broker receives LeaderAndIsrRequest > from > > > the > > > > > > broker. The broker should clean up empty those Log instances on > > > startup > > > > > > just in case a ChangeReplicaDirRequest was mistakenly sent to a > > > broker > > > > > that > > > > > > was not meant to be follower/leader of the partition.. > > > > > > > > > > > > > > > > > >> Another thing related to > > > > > >> ChangeReplicaDirRequest. > > > > > >> Since this request may take long to complete, I am not sure if > we > > > > should > > > > > >> wait for the movement to complete before respond. While waiting > > for > > > > the > > > > > >> movement to complete, the idle connection may be killed or the > > > client > > > > > may > > > > > >> be gone already. An alternative is to return immediately and > add a > > > new > > > > > >> request like CheckReplicaDirRequest to see if the movement has > > > > > completed. > > > > > >> The tool can take advantage of that to check the status. > > > > > >> > > > > > > > > > > > > I agree with your concern and solution. We need request to query > > the > > > > > > partition -> log_directory mapping on the broker. I have updated > > the > > > > KIP > > > > > to > > > > > > remove need for ChangeReplicaDirRequestPurgatory. > > > > > > Instead, kafka-reassignemnt-partitions.sh will send > > > > DescribeDirsRequest > > > > > > to brokers when user wants to verify the partition assignment. > > Since > > > we > > > > > > need this DescribeDirsRequest anyway, we can also use this > request > > to > > > > > > expose stats like the individual log size instead of using JMX. > One > > > > > > drawback of using JMX is that user has to manage the JMX port and > > > > related > > > > > > credentials if they haven't already done this, which is the case > at > > > > > > LinkedIn. > > > > > > > > > > > > > > > > > >> Thanks, > > > > > >> > > > > > >> Jun > > > > > >> > > > > > >> > > > > > >> > > > > > >> On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > > > >> > > > > > >> > Hey Jun, > > > > > >> > > > > > > >> > Thanks for the detailed explanation. I will use the separate > > > thread > > > > > >> pool to > > > > > >> > move replica between log directories. I will let you know when > > the > > > > KIP > > > > > >> has > > > > > >> > been updated to use a separate thread pool. > > > > > >> > > > > > > >> > Here is my response to your other questions: > > > > > >> > > > > > > >> > 1.3 My idea is that the ReplicaMoveThread that moves data > should > > > get > > > > > the > > > > > >> > lock before checking whether the replica in the destination > log > > > > > >> directory > > > > > >> > has caught up. If the new replica has caught up, then the > > > > > >> ReplicaMoveThread > > > > > >> > should swaps the replica while it is still holding the lock. > The > > > > > >> > ReplicaFetcherThread or RequestHandlerThread will not be able > to > > > > > append > > > > > >> > data to the replica in the source replica during this period > > > because > > > > > >> they > > > > > >> > can not get the lock. Does this address the problem? > > > > > >> > > > > > > >> > 2.3 I get your point that we want to keep controller simpler. > If > > > > admin > > > > > >> tool > > > > > >> > can send ChangeReplicaDirRequest to move data within a broker, > > > then > > > > > >> > controller probably doesn't even need to include log directory > > > path > > > > in > > > > > >> the > > > > > >> > LeaderAndIsrRequest. How about this: controller will only deal > > > with > > > > > >> > reassignment across brokers as it does now. If user specified > > > > > >> destination > > > > > >> > replica for any disk, the admin tool will send > > > > ChangeReplicaDirRequest > > > > > >> and > > > > > >> > wait for response from broker to confirm that all replicas > have > > > been > > > > > >> moved > > > > > >> > to the destination log direcotry. The broker will put > > > > > >> > ChangeReplicaDirRequset in a purgatory and respond either when > > the > > > > > >> movement > > > > > >> > is completed or when the request has timed-out. > > > > > >> > > > > > > >> > 4. I agree that we can expose these metrics via JMX. But I am > > not > > > > sure > > > > > >> if > > > > > >> > it can be obtained easily with good performance using either > > > > existing > > > > > >> tools > > > > > >> > or new script in kafka. I will ask SREs for their opinion. > > > > > >> > > > > > > >> > Thanks, > > > > > >> > Dong > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao <j...@confluent.io> > > wrote: > > > > > >> > > > > > > >> > > Hi, Dong, > > > > > >> > > > > > > > >> > > Thanks for the updated KIP. A few more comments below. > > > > > >> > > > > > > > >> > > 1.1 and 1.2: I am still not sure there is enough benefit of > > > > reusing > > > > > >> > > ReplicaFetchThread > > > > > >> > > to move data across disks. > > > > > >> > > (a) A big part of ReplicaFetchThread is to deal with issuing > > and > > > > > >> tracking > > > > > >> > > fetch requests. So, it doesn't feel that we get much from > > > reusing > > > > > >> > > ReplicaFetchThread > > > > > >> > > only to disable the fetching part. > > > > > >> > > (b) The leader replica has no ReplicaFetchThread to start > > with. > > > It > > > > > >> feels > > > > > >> > > weird to start one just for intra broker data movement. > > > > > >> > > (c) The ReplicaFetchThread is per broker. Intuitively, the > > > number > > > > of > > > > > >> > > threads doing intra broker data movement should be related > to > > > the > > > > > >> number > > > > > >> > of > > > > > >> > > disks in the broker, not the number of brokers in the > cluster. > > > > > >> > > (d) If the destination disk fails, we want to stop the intra > > > > broker > > > > > >> data > > > > > >> > > movement, but want to continue inter broker replication. So, > > > > > >> logically, > > > > > >> > it > > > > > >> > > seems it's better to separate out the two. > > > > > >> > > (e) I am also not sure if we should reuse the existing > > > throttling > > > > > for > > > > > >> > > replication. It's designed to handle traffic across brokers > > and > > > > the > > > > > >> > > delaying is done in the fetch request. So, if we are not > doing > > > > > >> > > fetching in ReplicaFetchThread, > > > > > >> > > I am not sure the existing throttling is effective. Also, > when > > > > > >> specifying > > > > > >> > > the throttling of moving data across disks, it seems the > user > > > > > >> shouldn't > > > > > >> > > care about whether a replica is a leader or a follower. > > Reusing > > > > the > > > > > >> > > existing throttling config name will be awkward in this > > regard. > > > > > >> > > (f) It seems it's simpler and more consistent to use a > > separate > > > > > thread > > > > > >> > pool > > > > > >> > > for local data movement (for both leader and follower > > replicas). > > > > > This > > > > > >> > > process can then be configured (e.g. number of threads, etc) > > and > > > > > >> > throttled > > > > > >> > > independently. > > > > > >> > > > > > > > >> > > 1.3 Yes, we will need some synchronization there. So, if the > > > > > movement > > > > > >> > > thread catches up, gets the lock to do the swap, but > realizes > > > that > > > > > new > > > > > >> > data > > > > > >> > > is added, it has to continue catching up while holding the > > lock? > > > > > >> > > > > > > > >> > > 2.3 The benefit of including the desired log directory in > > > > > >> > > LeaderAndIsrRequest > > > > > >> > > during partition reassignment is that the controller doesn't > > > need > > > > to > > > > > >> > track > > > > > >> > > the progress for disk movement. So, you don't need the > > > additional > > > > > >> > > BrokerDirStateUpdateRequest. Then the controller never needs > > to > > > > > issue > > > > > >> > > ChangeReplicaDirRequest. > > > > > >> > > Only the admin tool will issue ChangeReplicaDirRequest to > move > > > > data > > > > > >> > within > > > > > >> > > a broker. I agree that this makes LeaderAndIsrRequest more > > > > > >> complicated, > > > > > >> > but > > > > > >> > > that seems simpler than changing the controller to track > > > > additional > > > > > >> > states > > > > > >> > > during partition reassignment. > > > > > >> > > > > > > > >> > > 4. We want to make a decision on how to expose the stats. So > > > far, > > > > we > > > > > >> are > > > > > >> > > exposing stats like the individual log size as JMX. So, one > > way > > > is > > > > > to > > > > > >> > just > > > > > >> > > add new jmx to expose the log directory of individual > > replicas. > > > > > >> > > > > > > > >> > > Thanks, > > > > > >> > > > > > > > >> > > Jun > > > > > >> > > > > > > > >> > > > > > > > >> > > On Thu, Mar 2, 2017 at 11:18 PM, Dong Lin < > > lindon...@gmail.com> > > > > > >> wrote: > > > > > >> > > > > > > > >> > > > Hey Jun, > > > > > >> > > > > > > > > >> > > > Thanks for all the comments! Please see my answer below. I > > > have > > > > > >> updated > > > > > >> > > the > > > > > >> > > > KIP to address most of the questions and make the KIP > easier > > > to > > > > > >> > > understand. > > > > > >> > > > > > > > > >> > > > Thanks, > > > > > >> > > > Dong > > > > > >> > > > > > > > > >> > > > On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao <j...@confluent.io > > > > > > wrote: > > > > > >> > > > > > > > > >> > > > > Hi, Dong, > > > > > >> > > > > > > > > > >> > > > > Thanks for the KIP. A few comments below. > > > > > >> > > > > > > > > > >> > > > > 1. For moving data across directories > > > > > >> > > > > 1.1 I am not sure why we want to use > ReplicaFetcherThread > > to > > > > > move > > > > > >> > data > > > > > >> > > > > around in the leader. ReplicaFetchThread fetches data > from > > > > > socket. > > > > > >> > For > > > > > >> > > > > moving data locally, it seems that we want to avoid the > > > socket > > > > > >> > > overhead. > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > The purpose of using ReplicaFetchThread is to re-use > > existing > > > > > thread > > > > > >> > > > instead of creating more threads and make our thread model > > > more > > > > > >> > complex. > > > > > >> > > It > > > > > >> > > > seems like a nature choice for copying data between disks > > > since > > > > it > > > > > >> is > > > > > >> > > > similar to copying data between brokers. Another reason is > > > that > > > > if > > > > > >> the > > > > > >> > > > replica to be moved is a follower, we don't need lock to > > swap > > > > > >> replicas > > > > > >> > > when > > > > > >> > > > destination replica has caught up, since the same thread > > which > > > > is > > > > > >> > > fetching > > > > > >> > > > data from leader will swap the replica. > > > > > >> > > > > > > > > >> > > > The ReplicaFetchThread will not incur socket overhead > while > > > > > copying > > > > > >> > data > > > > > >> > > > between disks. It will read directly from source disk (as > we > > > do > > > > > when > > > > > >> > > > processing FetchRequest) and write to destination disk (as > > we > > > do > > > > > >> when > > > > > >> > > > processing ProduceRequest). > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > 1.2 I am also not sure about moving data in the > > > > > >> ReplicaFetcherThread > > > > > >> > in > > > > > >> > > > the > > > > > >> > > > > follower. For example, I am not sure setting > > > > > >> replica.fetch.max.wait > > > > > >> > to > > > > > >> > > 0 > > > > > >> > > > > is ideal. It may not always be effective since a fetch > > > > request > > > > > in > > > > > >> > the > > > > > >> > > > > ReplicaFetcherThread could be arbitrarily delayed due to > > > > > >> replication > > > > > >> > > > > throttling on the leader. In general, the data movement > > > logic > > > > > >> across > > > > > >> > > > disks > > > > > >> > > > > seems different from that in ReplicaFetcherThread. So, I > > am > > > > not > > > > > >> sure > > > > > >> > > why > > > > > >> > > > > they need to be coupled. > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > While it may not be the most efficient way to copy data > > > between > > > > > >> local > > > > > >> > > > disks, it will be at least as efficient as copying data > from > > > > > leader > > > > > >> to > > > > > >> > > the > > > > > >> > > > destination disk. The expected goal of KIP-113 is to > enable > > > data > > > > > >> > movement > > > > > >> > > > between disks with no less efficiency than what we do now > > when > > > > > >> moving > > > > > >> > > data > > > > > >> > > > between brokers. I think we can optimize its performance > > using > > > > > >> separate > > > > > >> > > > thread if the performance is not good enough. > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > 1.3 Could you add a bit more details on how we swap the > > > > replicas > > > > > >> when > > > > > >> > > the > > > > > >> > > > > new ones are fully caught up? For example, what happens > > when > > > > the > > > > > >> new > > > > > >> > > > > replica in the new log directory is caught up, but when > we > > > > want > > > > > >> to do > > > > > >> > > the > > > > > >> > > > > swap, some new data has arrived? > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > If the replica is a leader, then ReplicaFetcherThread will > > > > perform > > > > > >> the > > > > > >> > > > replacement. Proper lock is needed to prevent > > > > KafkaRequestHandler > > > > > >> from > > > > > >> > > > appending data to the topicPartition.log on the source > disks > > > > > before > > > > > >> > this > > > > > >> > > > replacement is completed by ReplicaFetcherThread. > > > > > >> > > > > > > > > >> > > > If the replica is a follower, because the same > > > > ReplicaFetchThread > > > > > >> which > > > > > >> > > > fetches data from leader will also swap the replica , no > > lock > > > is > > > > > >> > needed. > > > > > >> > > > > > > > > >> > > > I have updated the KIP to specify both more explicitly. > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > 1.4 Do we need to do the .move at the log segment level > or > > > > could > > > > > >> we > > > > > >> > > just > > > > > >> > > > do > > > > > >> > > > > that at the replica directory level? Renaming just a > > > directory > > > > > is > > > > > >> > much > > > > > >> > > > > faster than renaming the log segments. > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > Great point. I have updated the KIP to rename the log > > > directory > > > > > >> > instead. > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > 1.5 Could you also describe a bit what happens when > either > > > the > > > > > >> source > > > > > >> > > or > > > > > >> > > > > the target log directory fails while the data moving is > in > > > > > >> progress? > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > If source log directory fails, then the replica movement > > will > > > > stop > > > > > >> and > > > > > >> > > the > > > > > >> > > > source replica is marked offline. If destination log > > directory > > > > > >> fails, > > > > > >> > > then > > > > > >> > > > the replica movement will stop. I have updated the KIP to > > > > clarify > > > > > >> this. > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > 2. For partition reassignment. > > > > > >> > > > > 2.1 I am not sure if the controller can block on > > > > > >> > > ChangeReplicaDirRequest. > > > > > >> > > > > Data movement may take a long time to complete. If there > > is > > > an > > > > > >> > > > outstanding > > > > > >> > > > > request from the controller to a broker, that broker > won't > > > be > > > > > >> able to > > > > > >> > > > > process any new request from the controller. So if > another > > > > event > > > > > >> > (e.g. > > > > > >> > > > > broker failure) happens when the data movement is in > > > progress, > > > > > >> > > subsequent > > > > > >> > > > > LeaderAnIsrRequest will be delayed. > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > Yeah good point. I missed the fact that there is be only > one > > > > > >> inflight > > > > > >> > > > request from controller to broker. > > > > > >> > > > > > > > > >> > > > How about I add a request, e.g. > BrokerDirStateUpdateRequest, > > > > which > > > > > >> maps > > > > > >> > > > topicPartition to log directory and can be sent from > broker > > to > > > > > >> > controller > > > > > >> > > > to indicate completion? > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > 2.2 in the KIP, the partition reassignment tool is also > > used > > > > for > > > > > >> > cases > > > > > >> > > > > where an admin just wants to balance the existing data > > > across > > > > > log > > > > > >> > > > > directories in the broker. In this case, it seems that > > it's > > > > over > > > > > >> > > killing > > > > > >> > > > to > > > > > >> > > > > have the process go through the controller. A simpler > > > approach > > > > > is > > > > > >> to > > > > > >> > > > issue > > > > > >> > > > > an RPC request to the broker directly. > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > I agree we can optimize this case. It is just that we have > > to > > > > add > > > > > >> new > > > > > >> > > logic > > > > > >> > > > or code path to handle a scenario that is already covered > by > > > the > > > > > >> more > > > > > >> > > > complicated scenario. I will add it to the KIP. > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > 2.3 When using the partition reassignment tool to move > > > > replicas > > > > > >> > across > > > > > >> > > > > brokers, it make sense to be able to specify the log > > > directory > > > > > of > > > > > >> the > > > > > >> > > > newly > > > > > >> > > > > created replicas. The KIP does that in two separate > > requests > > > > > >> > > > > ChangeReplicaDirRequest and LeaderAndIsrRequest, and > > tracks > > > > the > > > > > >> > > progress > > > > > >> > > > of > > > > > >> > > > > each independently. An alternative is to do that just in > > > > > >> > > > > LeaderAndIsrRequest. > > > > > >> > > > > That way, the new replicas will be created in the right > > log > > > > dir > > > > > in > > > > > >> > the > > > > > >> > > > > first place and the controller just needs to track the > > > > progress > > > > > of > > > > > >> > > > > partition reassignment in the current way. > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > I agree it is better to use one request instead of two to > > > > request > > > > > >> > replica > > > > > >> > > > movement between disks. But I think the performance > > advantage > > > of > > > > > >> doing > > > > > >> > so > > > > > >> > > > is negligible because we trigger replica assignment much > > less > > > > than > > > > > >> all > > > > > >> > > > other kinds of events in the Kafka cluster. I am not sure > > that > > > > the > > > > > >> > > benefit > > > > > >> > > > of doing this is worth the effort to add an optional > string > > > > field > > > > > in > > > > > >> > the > > > > > >> > > > LeaderAndIsrRequest. Also if we add this optional field in > > the > > > > > >> > > > LeaderAndIsrRequest, we probably want to remove > > > > > >> ChangeReplicaDirRequest > > > > > >> > > to > > > > > >> > > > avoid having two requests doing the same thing. But it > means > > > > user > > > > > >> > script > > > > > >> > > > can not send request directly to the broker to trigger > > replica > > > > > >> movement > > > > > >> > > > between log directories. > > > > > >> > > > > > > > > >> > > > I will do it if you are strong about this optimzation. > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > 3. /admin/reassign_partitions: Including the log dir in > > > every > > > > > >> replica > > > > > >> > > may > > > > > >> > > > > not be efficient. We could include a list of log > > directories > > > > and > > > > > >> > > > reference > > > > > >> > > > > the index of the log directory in each replica. > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > Good point. I have updated the KIP to use this solution. > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > 4. DescribeDirsRequest: The stats in the request are > > already > > > > > >> > available > > > > > >> > > > from > > > > > >> > > > > JMX. Do we need the new request? > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > Does JMX also include the state (i.e. offline or online) > of > > > each > > > > > log > > > > > >> > > > directory and the log directory of each replica? If not, > > then > > > > > maybe > > > > > >> we > > > > > >> > > > still need DescribeDirsRequest? > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > 5. We want to be consistent on ChangeReplicaDirRequest > vs > > > > > >> > > > > ChangeReplicaRequest. > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > I think ChangeReplicaRequest and ChangeReplicaResponse is > my > > > > typo. > > > > > >> > Sorry, > > > > > >> > > > they are fixed now. > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > Thanks, > > > > > >> > > > > > > > > > >> > > > > Jun > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > On Fri, Feb 3, 2017 at 6:19 PM, Dong Lin < > > > lindon...@gmail.com > > > > > > > > > > >> > wrote: > > > > > >> > > > > > > > > > >> > > > > > Hey ALexey, > > > > > >> > > > > > > > > > > >> > > > > > Thanks for all the comments! > > > > > >> > > > > > > > > > > >> > > > > > I have updated the KIP to specify how we enforce > quota. > > I > > > > also > > > > > >> > > updated > > > > > >> > > > > the > > > > > >> > > > > > "The thread model and broker logic for moving replica > > data > > > > > >> between > > > > > >> > > log > > > > > >> > > > > > directories" to make it easier to read. You can find > the > > > > exact > > > > > >> > change > > > > > >> > > > > here > > > > > >> > > > > > <https://cwiki.apache.org/conf > > > > luence/pages/diffpagesbyversio > > > > > >> > > > > > n.action?pageId=67638408&selec > > > > tedPageVersions=5&selectedPage > > > > > >> > > > Versions=6>. > > > > > >> > > > > > The idea is to use the same replication quota > mechanism > > > > > >> introduced > > > > > >> > in > > > > > >> > > > > > KIP-73. > > > > > >> > > > > > > > > > > >> > > > > > Thanks, > > > > > >> > > > > > Dong > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > On Wed, Feb 1, 2017 at 2:16 AM, Alexey Ozeritsky < > > > > > >> > > aozerit...@yandex.ru > > > > > >> > > > > > > > > > >> > > > > > wrote: > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > 24.01.2017, 22:03, "Dong Lin" <lindon...@gmail.com > >: > > > > > >> > > > > > > > Hey Alexey, > > > > > >> > > > > > > > > > > > > >> > > > > > > > Thanks. I think we agreed that the suggested > > solution > > > > > >> doesn't > > > > > >> > > work > > > > > >> > > > in > > > > > >> > > > > > > > general for kafka users. To answer your questions: > > > > > >> > > > > > > > > > > > > >> > > > > > > > 1. I agree we need quota to rate limit replica > > > movement > > > > > >> when a > > > > > >> > > > broker > > > > > >> > > > > > is > > > > > >> > > > > > > > moving a "leader" replica. I will come up with > > > solution, > > > > > >> > probably > > > > > >> > > > > > re-use > > > > > >> > > > > > > > the config of replication quota introduced in > > KIP-73. > > > > > >> > > > > > > > > > > > > >> > > > > > > > 2. Good point. I agree that this is a problem in > > > > general. > > > > > >> If is > > > > > >> > > no > > > > > >> > > > > new > > > > > >> > > > > > > data > > > > > >> > > > > > > > on that broker, with current default value of > > > > > >> > > > > > replica.fetch.wait.max.ms > > > > > >> > > > > > > > and replica.fetch.max.bytes, the replica will be > > moved > > > > at > > > > > >> only > > > > > >> > 2 > > > > > >> > > > MBps > > > > > >> > > > > > > > throughput. I think the solution is for broker to > > set > > > > > >> > > > > > > > replica.fetch.wait.max.ms to 0 in its > FetchRequest > > if > > > > the > > > > > >> > > > > > corresponding > > > > > >> > > > > > > > ReplicaFetcherThread needs to move some replica to > > > > another > > > > > >> > disk. > > > > > >> > > > > > > > > > > > > >> > > > > > > > 3. I have updated the KIP to mention that the read > > > size > > > > > of a > > > > > >> > > given > > > > > >> > > > > > > > partition is configured using > > replica.fetch.max.bytes > > > > when > > > > > >> we > > > > > >> > > move > > > > > >> > > > > > > replicas > > > > > >> > > > > > > > between disks. > > > > > >> > > > > > > > > > > > > >> > > > > > > > Please see this > > > > > >> > > > > > > > <https://cwiki.apache.org/conf > > > > > >> luence/pages/diffpagesbyversio > > > > > >> > > > n.action > > > > > >> > > > > ? > > > > > >> > > > > > > pageId=67638408&selectedPageVe > > > > > rsions=4&selectedPageVersions= > > > > > >> 5> > > > > > >> > > > > > > > for the change of the KIP. I will come up with a > > > > solution > > > > > to > > > > > >> > > > throttle > > > > > >> > > > > > > > replica movement when a broker is moving a > "leader" > > > > > replica. > > > > > >> > > > > > > > > > > > >> > > > > > > Thanks. It looks great. > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > On Tue, Jan 24, 2017 at 3:30 AM, Alexey Ozeritsky > < > > > > > >> > > > > > aozerit...@yandex.ru> > > > > > >> > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > >> > > > > > > >> 23.01.2017, 22:11, "Dong Lin" < > > lindon...@gmail.com > > > >: > > > > > >> > > > > > > >> > Thanks. Please see my comment inline. > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > On Mon, Jan 23, 2017 at 6:45 AM, Alexey > > Ozeritsky > > > < > > > > > >> > > > > > > aozerit...@yandex.ru> > > > > > >> > > > > > > >> > wrote: > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> >> 13.01.2017, 22:29, "Dong Lin" < > > > lindon...@gmail.com > > > > >: > > > > > >> > > > > > > >> >> > Hey Alexey, > > > > > >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > Thanks for your review and the alternative > > > > > approach. > > > > > >> > Here > > > > > >> > > is > > > > > >> > > > > my > > > > > >> > > > > > > >> >> > understanding of your patch. kafka's > > background > > > > > >> threads > > > > > >> > > are > > > > > >> > > > > used > > > > > >> > > > > > > to > > > > > >> > > > > > > >> move > > > > > >> > > > > > > >> >> > data between replicas. When data movement > is > > > > > >> triggered, > > > > > >> > > the > > > > > >> > > > > log > > > > > >> > > > > > > will > > > > > >> > > > > > > >> be > > > > > >> > > > > > > >> >> > rolled and the new logs will be put in the > > new > > > > > >> > directory, > > > > > >> > > > and > > > > > >> > > > > > > >> background > > > > > >> > > > > > > >> >> > threads will move segment from old > directory > > to > > > > new > > > > > >> > > > directory. > > > > > >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > It is important to note that KIP-112 is > > > intended > > > > to > > > > > >> work > > > > > >> > > > with > > > > > >> > > > > > > >> KIP-113 to > > > > > >> > > > > > > >> >> > support JBOD. I think your solution is > > > definitely > > > > > >> > simpler > > > > > >> > > > and > > > > > >> > > > > > > better > > > > > >> > > > > > > >> >> under > > > > > >> > > > > > > >> >> > the current kafka implementation that a > > broker > > > > will > > > > > >> fail > > > > > >> > > if > > > > > >> > > > > any > > > > > >> > > > > > > disk > > > > > >> > > > > > > >> >> fails. > > > > > >> > > > > > > >> >> > But I am not sure if we want to allow > broker > > to > > > > run > > > > > >> with > > > > > >> > > > > partial > > > > > >> > > > > > > >> disks > > > > > >> > > > > > > >> >> > failure. Let's say the a replica is being > > moved > > > > > from > > > > > >> > > > > log_dir_old > > > > > >> > > > > > > to > > > > > >> > > > > > > >> >> > log_dir_new and then log_dir_old stops > > working > > > > due > > > > > to > > > > > >> > disk > > > > > >> > > > > > > failure. > > > > > >> > > > > > > >> How > > > > > >> > > > > > > >> >> > would your existing patch handles it? To > make > > > the > > > > > >> > > scenario a > > > > > >> > > > > bit > > > > > >> > > > > > > more > > > > > >> > > > > > > >> >> > > > > > >> > > > > > > >> >> We will lose log_dir_old. After broker > restart > > we > > > > can > > > > > >> read > > > > > >> > > the > > > > > >> > > > > > data > > > > > >> > > > > > > >> from > > > > > >> > > > > > > >> >> log_dir_new. > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > No, you probably can't. This is because the > > broker > > > > > >> doesn't > > > > > >> > > have > > > > > >> > > > > > > *all* the > > > > > >> > > > > > > >> > data for this partition. For example, say the > > > broker > > > > > has > > > > > >> > > > > > > >> > partition_segement_1, partition_segment_50 and > > > > > >> > > > > > partition_segment_100 > > > > > >> > > > > > > on > > > > > >> > > > > > > >> the > > > > > >> > > > > > > >> > log_dir_old. partition_segment_100, which has > > the > > > > > latest > > > > > >> > > data, > > > > > >> > > > > has > > > > > >> > > > > > > been > > > > > >> > > > > > > >> > moved to log_dir_new, and the log_dir_old > fails > > > > before > > > > > >> > > > > > > >> partition_segment_50 > > > > > >> > > > > > > >> > and partition_segment_1 is moved to > log_dir_new. > > > > When > > > > > >> > broker > > > > > >> > > > > > > re-starts, > > > > > >> > > > > > > >> it > > > > > >> > > > > > > >> > won't have partition_segment_50. This causes > > > problem > > > > > if > > > > > >> > > broker > > > > > >> > > > is > > > > > >> > > > > > > elected > > > > > >> > > > > > > >> > leader and consumer wants to consume data in > the > > > > > >> > > > > > partition_segment_1. > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> Right. > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> >> > complicated, let's say the broker is > > shtudown, > > > > > >> > > log_dir_old's > > > > > >> > > > > > disk > > > > > >> > > > > > > >> fails, > > > > > >> > > > > > > >> >> > and the broker starts. In this case broker > > > > doesn't > > > > > >> even > > > > > >> > > know > > > > > >> > > > > if > > > > > >> > > > > > > >> >> log_dir_new > > > > > >> > > > > > > >> >> > has all the data needed for this replica. > It > > > > > becomes > > > > > >> a > > > > > >> > > > problem > > > > > >> > > > > > if > > > > > >> > > > > > > the > > > > > >> > > > > > > >> >> > broker is elected leader of this partition > in > > > > this > > > > > >> case. > > > > > >> > > > > > > >> >> > > > > > >> > > > > > > >> >> log_dir_new contains the most recent data so > we > > > > will > > > > > >> lose > > > > > >> > > the > > > > > >> > > > > tail > > > > > >> > > > > > > of > > > > > >> > > > > > > >> >> partition. > > > > > >> > > > > > > >> >> This is not a big problem for us because we > > > already > > > > > >> delete > > > > > >> > > > tails > > > > > >> > > > > > by > > > > > >> > > > > > > >> hand > > > > > >> > > > > > > >> >> (see https://issues.apache.org/jira > > > > > /browse/KAFKA-1712 > > > > > >> ). > > > > > >> > > > > > > >> >> Also we dont use authomatic leader balancing > > > > > >> > > > > > > >> (auto.leader.rebalance.enable=false), > > > > > >> > > > > > > >> >> so this partition becomes the leader with a > low > > > > > >> > probability. > > > > > >> > > > > > > >> >> I think my patch can be modified to prohibit > > the > > > > > >> selection > > > > > >> > > of > > > > > >> > > > > the > > > > > >> > > > > > > >> leader > > > > > >> > > > > > > >> >> until the partition does not move completely. > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > I guess you are saying that you have deleted > the > > > > tails > > > > > >> by > > > > > >> > > hand > > > > > >> > > > in > > > > > >> > > > > > > your > > > > > >> > > > > > > >> own > > > > > >> > > > > > > >> > kafka branch. But KAFKA-1712 is not accepted > > into > > > > > Kafka > > > > > >> > trunk > > > > > >> > > > > and I > > > > > >> > > > > > > am > > > > > >> > > > > > > >> not > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> No. We just modify segments mtime by cron job. > > This > > > > > works > > > > > >> > with > > > > > >> > > > > > vanilla > > > > > >> > > > > > > >> kafka. > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > sure if it is the right solution. How would > this > > > > > >> solution > > > > > >> > > > address > > > > > >> > > > > > the > > > > > >> > > > > > > >> > problem mentioned above? > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> If you need only fresh data and if you remove > old > > > data > > > > > by > > > > > >> > hands > > > > > >> > > > > this > > > > > >> > > > > > is > > > > > >> > > > > > > >> not a problem. But in general case > > > > > >> > > > > > > >> this is a problem of course. > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > BTW, I am not sure the solution mentioned in > > > > > KAFKA-1712 > > > > > >> is > > > > > >> > > the > > > > > >> > > > > > right > > > > > >> > > > > > > way > > > > > >> > > > > > > >> to > > > > > >> > > > > > > >> > address its problem. Now that we have > timestamp > > in > > > > the > > > > > >> > > message > > > > > >> > > > we > > > > > >> > > > > > > can use > > > > > >> > > > > > > >> > that to delete old segement instead of relying > > on > > > > the > > > > > >> log > > > > > >> > > > segment > > > > > >> > > > > > > mtime. > > > > > >> > > > > > > >> > Just some idea and we don't have to discuss > this > > > > > problem > > > > > >> > > here. > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > The solution presented in the KIP attempts > to > > > > > handle > > > > > >> it > > > > > >> > by > > > > > >> > > > > > > replacing > > > > > >> > > > > > > >> >> > replica in an atomic version fashion after > > the > > > > log > > > > > in > > > > > >> > the > > > > > >> > > > new > > > > > >> > > > > > dir > > > > > >> > > > > > > has > > > > > >> > > > > > > >> >> fully > > > > > >> > > > > > > >> >> > caught up with the log in the old dir. At > at > > > time > > > > > the > > > > > >> > log > > > > > >> > > > can > > > > > >> > > > > be > > > > > >> > > > > > > >> >> considered > > > > > >> > > > > > > >> >> > to exist on only one log directory. > > > > > >> > > > > > > >> >> > > > > > >> > > > > > > >> >> As I understand your solution does not cover > > > > quotas. > > > > > >> > > > > > > >> >> What happens if someone starts to transfer > 100 > > > > > >> partitions > > > > > >> > ? > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > Good point. Quota can be implemented in the > > > future. > > > > It > > > > > >> is > > > > > >> > > > > currently > > > > > >> > > > > > > >> > mentioned as as a potential future improvement > > in > > > > > >> KIP-112 > > > > > >> > > > > > > >> > <https://cwiki.apache.org/conf > > > > > luence/display/KAFKA/KIP- > > > > > >> > 112%3 > > > > > >> > > > > > > >> A+Handle+disk+failure+for+JBOD>.Thanks > > > > > >> > > > > > > >> > for the reminder. I will move it to KIP-113. > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> >> > If yes, it will read a ByteBufferMessageSet > > > from > > > > > >> > > > > > > topicPartition.log > > > > > >> > > > > > > >> and > > > > > >> > > > > > > >> >> append the message set to topicPartition.move > > > > > >> > > > > > > >> >> > > > > > >> > > > > > > >> >> i.e. processPartitionData will read data from > > the > > > > > >> > beginning > > > > > >> > > of > > > > > >> > > > > > > >> >> topicPartition.log? What is the read size? > > > > > >> > > > > > > >> >> A ReplicaFetchThread reads many partitions so > > if > > > > one > > > > > >> does > > > > > >> > > some > > > > > >> > > > > > > >> complicated > > > > > >> > > > > > > >> >> work (= read a lot of data from disk) > > everything > > > > will > > > > > >> slow > > > > > >> > > > down. > > > > > >> > > > > > > >> >> I think read size should not be very big. > > > > > >> > > > > > > >> >> > > > > > >> > > > > > > >> >> On the other hand at this point > > > > > (processPartitionData) > > > > > >> one > > > > > >> > > can > > > > > >> > > > > use > > > > > >> > > > > > > only > > > > > >> > > > > > > >> >> the new data (ByteBufferMessageSet from > > > parameters) > > > > > and > > > > > >> > wait > > > > > >> > > > > until > > > > > >> > > > > > > >> >> (topicPartition.move.smallestOffset <= > > > > > >> > > > > > > topicPartition.log.smallestOff > > > > > >> > > > > > > >> set > > > > > >> > > > > > > >> >> && topicPartition.log.largestOffset == > > > > > >> > > > > > > topicPartition.log.largestOffs > > > > > >> > > > > > > >> et). > > > > > >> > > > > > > >> >> In this case the write speed to > > > topicPartition.move > > > > > and > > > > > >> > > > > > > >> topicPartition.log > > > > > >> > > > > > > >> >> will be the same so this will allow us to > move > > > many > > > > > >> > > partitions > > > > > >> > > > > to > > > > > >> > > > > > > one > > > > > >> > > > > > > >> disk. > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > The read size of a given partition is > configured > > > > > >> > > > > > > >> > using replica.fetch.max.bytes, which is the > same > > > > size > > > > > >> used > > > > > >> > by > > > > > >> > > > > > > >> FetchRequest > > > > > >> > > > > > > >> > from follower to leader. If the broker is > > moving a > > > > > >> replica > > > > > >> > > for > > > > > >> > > > > > which > > > > > >> > > > > > > it > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> OK. Could you mention it in KIP? > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > acts as a follower, the disk write rate for > > moving > > > > > this > > > > > >> > > replica > > > > > >> > > > > is > > > > > >> > > > > > at > > > > > >> > > > > > > >> most > > > > > >> > > > > > > >> > the rate it fetches from leader (assume it is > > > > catching > > > > > >> up > > > > > >> > and > > > > > >> > > > has > > > > > >> > > > > > > >> > sufficient data to read from leader, which is > > > > subject > > > > > to > > > > > >> > > > > > > round-trip-time > > > > > >> > > > > > > >> > between itself and the leader. Thus this part > if > > > > > >> probably > > > > > >> > > fine > > > > > >> > > > > even > > > > > >> > > > > > > >> without > > > > > >> > > > > > > >> > quota. > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> I think there are 2 problems > > > > > >> > > > > > > >> 1. Without speed limiter this will not work good > > > even > > > > > for > > > > > >> 1 > > > > > >> > > > > > partition. > > > > > >> > > > > > > In > > > > > >> > > > > > > >> our production we had a problem so we did the > > > throuput > > > > > >> > limiter: > > > > > >> > > > > > > >> https://github.com/resetius/ka > > > > > >> fka/commit/cda31dadb2f135743bf > > > > > >> > > > > > > >> 41083062927886c5ddce1#diff-ffa > > > > > >> 8861e850121997a534ebdde2929c6R > > > > > >> > > 713 > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> 2. I dont understand how it will work in case of > > big > > > > > >> > > > > > > >> replica.fetch.wait.max.ms and partition with > > > > irregular > > > > > >> flow. > > > > > >> > > > > > > >> For example someone could have > > > > > replica.fetch.wait.max.ms > > > > > >> > =10mi > > > > > >> > > > nutes > > > > > >> > > > > > and > > > > > >> > > > > > > >> partition that has very high data flow from > 12:00 > > to > > > > > 13:00 > > > > > >> > and > > > > > >> > > > zero > > > > > >> > > > > > > flow > > > > > >> > > > > > > >> otherwise. > > > > > >> > > > > > > >> In this case processPartitionData could be > called > > > once > > > > > per > > > > > >> > > > > 10minutes > > > > > >> > > > > > > so if > > > > > >> > > > > > > >> we start data moving in 13:01 it will be > finished > > > next > > > > > >> day. > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > But ff the broker is moving a replica for > which > > it > > > > > acts > > > > > >> as > > > > > >> > a > > > > > >> > > > > > leader, > > > > > >> > > > > > > as > > > > > >> > > > > > > >> of > > > > > >> > > > > > > >> > current KIP the broker will keep reading from > > > > > >> log_dir_old > > > > > >> > and > > > > > >> > > > > > append > > > > > >> > > > > > > to > > > > > >> > > > > > > >> > log_dir_new without having to wait for > > > > > round-trip-time. > > > > > >> We > > > > > >> > > > > probably > > > > > >> > > > > > > need > > > > > >> > > > > > > >> > quota for this in the future. > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > And to answer your question, yes > > > > topicpartition.log > > > > > >> > refers > > > > > >> > > > to > > > > > >> > > > > > > >> >> > topic-paritition/segment.log. > > > > > >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > Thanks, > > > > > >> > > > > > > >> >> > Dong > > > > > >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> > On Fri, Jan 13, 2017 at 4:12 AM, Alexey > > > > Ozeritsky < > > > > > >> > > > > > > >> aozerit...@yandex.ru> > > > > > >> > > > > > > >> >> > wrote: > > > > > >> > > > > > > >> >> > > > > > > >> > > > > > > >> >> >> Hi, > > > > > >> > > > > > > >> >> >> > > > > > >> > > > > > > >> >> >> We have the similar solution that have > been > > > > > working > > > > > >> in > > > > > >> > > > > > production > > > > > >> > > > > > > >> since > > > > > >> > > > > > > >> >> >> 2014. You can see it here: > > > > > >> > > https://github.com/resetius/ka > > > > > >> > > > > > > >> >> >> fka/commit/20658593e246d218490 > > > > > 6879defa2e763c4d413fb > > > > > >> > > > > > > >> >> >> The idea is very simple > > > > > >> > > > > > > >> >> >> 1. Disk balancer runs in a separate thread > > > > inside > > > > > >> > > scheduler > > > > > >> > > > > > pool. > > > > > >> > > > > > > >> >> >> 2. It does not touch empty partitions > > > > > >> > > > > > > >> >> >> 3. Before it moves a partition it forcibly > > > > creates > > > > > >> new > > > > > >> > > > > segment > > > > > >> > > > > > > on a > > > > > >> > > > > > > >> >> >> destination disk > > > > > >> > > > > > > >> >> >> 4. It moves segment by segment from new to > > > old. > > > > > >> > > > > > > >> >> >> 5. Log class works with segments on both > > disks > > > > > >> > > > > > > >> >> >> > > > > > >> > > > > > > >> >> >> Your approach seems too complicated, > > moreover > > > it > > > > > >> means > > > > > >> > > that > > > > > >> > > > > you > > > > > >> > > > > > > >> have to > > > > > >> > > > > > > >> >> >> patch different components of the system > > > > > >> > > > > > > >> >> >> Could you clarify what do you mean by > > > > > >> > topicPartition.log? > > > > > >> > > > Is > > > > > >> > > > > it > > > > > >> > > > > > > >> >> >> topic-paritition/segment.log ? > > > > > >> > > > > > > >> >> >> > > > > > >> > > > > > > >> >> >> 12.01.2017, 21:47, "Dong Lin" < > > > > > lindon...@gmail.com > > > > > >> >: > > > > > >> > > > > > > >> >> >> > Hi all, > > > > > >> > > > > > > >> >> >> > > > > > > >> > > > > > > >> >> >> > We created KIP-113: Support replicas > > > movement > > > > > >> between > > > > > >> > > log > > > > > >> > > > > > > >> >> directories. > > > > > >> > > > > > > >> >> >> > Please find the KIP wiki in the link > > > > > >> > > > > > > >> >> >> > *https://cwiki.apache.org/conf > > > > > >> > > > > luence/display/KAFKA/KIP-113% > > > > > >> > > > > > > >> >> >> 3A+Support+replicas+movement+b > > > > > >> etween+log+directories > > > > > >> > > > > > > >> >> >> > <https://cwiki.apache.org/conf > > > > > >> > > > > luence/display/KAFKA/KIP-113% > > > > > >> > > > > > > >> >> >> 3A+Support+replicas+movement+ > > > > > >> > between+log+directories>.* > > > > > >> > > > > > > >> >> >> > > > > > > >> > > > > > > >> >> >> > This KIP is related to KIP-112 > > > > > >> > > > > > > >> >> >> > <https://cwiki.apache.org/conf > > > > > >> > > > > luence/display/KAFKA/KIP-112% > > > > > >> > > > > > > >> >> >> 3A+Handle+disk+failure+for+JBOD>: > > > > > >> > > > > > > >> >> >> > Handle disk failure for JBOD. They are > > > needed > > > > in > > > > > >> > order > > > > > >> > > to > > > > > >> > > > > > > support > > > > > >> > > > > > > >> >> JBOD in > > > > > >> > > > > > > >> >> >> > Kafka. Please help review the KIP. You > > > > feedback > > > > > is > > > > > >> > > > > > appreciated! > > > > > >> > > > > > > >> >> >> > > > > > > >> > > > > > > >> >> >> > Thanks, > > > > > >> > > > > > > >> >> >> > Dong > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >