Hi, Dong, Ok, so it seems that in solution (2), if the tool exits successfully, then we know for sure that all replicas will be in the right log dirs. Solution (1) doesn't guarantee that. That seems better and we can go with your current solution then.
Thanks, Jun On Fri, Mar 24, 2017 at 4:28 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > No.. the current approach describe in the KIP (see here > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-113% > 3A+Support+replicas+movement+between+log+directories#KIP- > 113:Supportreplicasmovementbetweenlogdirectories-2)Howtoreas > signreplicabetweenlogdirectoriesacrossbrokers>) > also sends ChangeReplicaDirRequest before writing reassignment path in ZK. > I think we discussing whether ChangeReplicaDirResponse (1) shows success or > (2) should specify ReplicaNotAvailableException, if replica has not been > created yet. > > Since both solution will send ChangeReplicaDirRequest before writing > reassignment in ZK, their chance of creating replica in the right directory > is the same. > > To take care of the rarer case that some brokers go down immediately after > the reassignment tool is run, solution (1) requires reassignment tool to > repeatedly send DescribeDirRequest and ChangeReplicaDirRequest, while > solution (1) requires tool to only retry ChangeReplicaDirRequest if the > response says ReplicaNotAvailableException. It seems that solution (2) is > cleaner because ChangeReplicaDirRequest won't depend on DescribeDirRequest. > What do you think? > > Thanks, > Dong > > > On Fri, Mar 24, 2017 at 3:56 PM, Jun Rao <j...@confluent.io> wrote: > > > Hi, Dong, > > > > We are just comparing whether it's better for the reassignment tool to > > send ChangeReplicaDirRequest > > (1) before or (2) after writing the reassignment path in ZK . > > > > In the case when all brokers are alive when the reassignment tool is run, > > (1) guarantees 100% that the new replicas will be in the right log dirs > and > > (2) can't. > > > > In the rarer case that some brokers go down immediately after the > > reassignment tool is run, in either approach, there is a chance when the > > failed broker comes back, it will complete the pending reassignment > process > > by putting some replicas in the wrong log dirs. > > > > Implementation wise, (1) and (2) seem to be the same. So, it seems to me > > that (1) is better? > > > > Thanks, > > > > Jun > > > > > > On Thu, Mar 23, 2017 at 11:54 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > Hey Jun, > > > > > > Thanks much for the response! I agree with you that if multiple > replicas > > > are created in the wrong directory, we may waste resource if either > > > replicaMoveThread number is low or intra.broker.throttled.rate is slow. > > > Then the question is whether the suggested approach increases the > chance > > of > > > replica being created in the correct log directory. > > > > > > I think the answer is no due to the argument provided in the previous > > > email. Sending ChangeReplicaDirRequest before updating znode has > > negligible > > > impact on the chance that the broker processes ChangeReplicaDirRequest > > > before LeaderAndIsrRequest from controller. If we still worry about the > > > order they are sent, the reassignment tool can first send > > > ChangeReplicaDirRequest (so that broker remembers it in memory), create > > > reassignment znode, and then retry ChangeReplicaDirRequset if the > > previous > > > ChangeReplicaDirResponse says the replica has not been created. This > > should > > > give us the highest possible chance of creating replica in the correct > > > directory and avoid the problem of the suggested approach. I have > updated > > > "How > > > to reassign replica between log directories across brokers" in the KIP > to > > > explain this procedure. > > > > > > To answer your question, the reassignment tool should fail with with > > proper > > > error message if user has specified log directory for a replica on an > > > offline broker. This is reasonable because reassignment tool can not > > > guarantee that the replica will be moved to the specified log directory > > if > > > the broker is offline. If all brokers are online, the reassignment tool > > may > > > hung up to 10 seconds (by default) to retry ChangeReplicaDirRequest if > > any > > > replica has not been created already. User can change this timeout > value > > > using the newly-added --timeout argument of the reassignment tool. This > > is > > > specified in the Public Interface section in the KIP. The reassignment > > tool > > > will only block if user uses this new feature of reassigning replica > to a > > > specific log directory in the broker. Therefore it seems backward > > > compatible. > > > > > > Does this address the concern? > > > > > > Thanks, > > > Dong > > > > > > On Thu, Mar 23, 2017 at 10:06 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > Hi, Dong, > > > > > > > > 11.2 I think there are a few reasons why the cross disk movement may > > not > > > > catch up if the replicas are created in the wrong log dirs to start > > with. > > > > (a) There could be more replica fetcher threads than the disk > movement > > > > threads. (b) intra.broker.throttled.rate may be configured lower than > > the > > > > replica throttle rate. That's why I think getting the replicas > created > > in > > > > the right log dirs will be better. > > > > > > > > For the corner case issue that you mentioned, I am not sure if the > > > approach > > > > in the KIP completely avoids that. If a broker is down when the > > partition > > > > reassignment tool is started, does the tool just hang (keep retrying > > > > ChangeReplicaDirRequest) until the broker comes back? Currently, the > > > > partition reassignment tool doesn't block. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > On Tue, Mar 21, 2017 at 11:24 AM, Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > Hey Jun, > > > > > > > > > > Thanks for the explanation. Please see below my thoughts. > > > > > > > > > > 10. I see. So you are concerned with the potential implementation > > > > > complexity which I wasn't aware of. I think it is OK not to do log > > > > > cleaning on the .move log since there can be only one such log in > > each > > > > > directory. I have updated the KIP to specify this: > > > > > > > > > > "The log segments in topicPartition.move directory will be subject > to > > > log > > > > > truncation, log retention in the same way as the log segments in > the > > > > source > > > > > log directory. But we may not do log cleaning on the > > > topicPartition.move > > > > to > > > > > simplify the implementation." > > > > > > > > > > 11.2 Now I get your point. I think we have slightly different > > > expectation > > > > > of the order in which the reassignment tools updates reassignment > > node > > > in > > > > > ZK and sends ChangeReplicaDirRequest. > > > > > > > > > > I think the reassignment tool should first create reassignment > znode > > > and > > > > > then keep sending ChangeReplicaDirRequest until success. I think > > > sending > > > > > ChangeReplicaDirRequest before updating znode has negligible impact > > on > > > > the > > > > > chance that the broker processes ChangeReplicaDirRequest before > > > > > LeaderAndIsrRequest from controller, because the time for > controller > > to > > > > > receive ZK notification, handle state machine changes and send > > > > > LeaderAndIsrRequests should be much longer than the time for > > > reassignment > > > > > tool to setup connection with broker and send > > ChangeReplicaDirRequest. > > > > Even > > > > > if broker receives LeaderAndIsrRequest a bit sooner, the data in > the > > > > > original replica should be smaller enough for .move log to catch up > > > very > > > > > quickly, so that broker can swap the log soon after it receives > > > > > ChangeReplicaDirRequest -- otherwise the > intra.broker.throttled.rate > > is > > > > > probably too small. Does this address your concern with the > > > performance? > > > > > > > > > > One concern with the suggested approach is that the > > > > ChangeReplicaDirRequest > > > > > may be lost if broker crashes before it creates the replica. I > agree > > it > > > > is > > > > > rare. But it will be confusing when it happens. Operators would > have > > to > > > > > keep verifying reassignment and possibly retry execution until > > success > > > if > > > > > they want to make sure that the ChangeReplicaDirRequest is > executed. > > > > > > > > > > Thanks, > > > > > Dong > > > > > > > > > > > > > > > > > > > > On Tue, Mar 21, 2017 at 8:37 AM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > 10. I was mainly concerned about the additional complexity needed > > to > > > > > > support log cleaning in the .move log. For example, LogToClean is > > > keyed > > > > > off > > > > > > TopicPartition. To be able to support cleaning different > instances > > of > > > > the > > > > > > same partition, we need additional logic. I am not how much > > > additional > > > > > > complexity is needed and whether it's worth it. If we don't do > log > > > > > cleaning > > > > > > at all on the .move log, then we don't have to change the log > > > cleaner's > > > > > > code. > > > > > > > > > > > > 11.2 I was thinking of the following flow. In the execute phase, > > the > > > > > > reassignment tool first issues a ChangeReplicaDirRequest to > brokers > > > > where > > > > > > new replicas will be created. The brokers remember the mapping > and > > > > > return a > > > > > > successful code. The reassignment tool then initiates the cross > > > broker > > > > > > movement through the controller. In the verify phase, in addition > > to > > > > > > checking the replica assignment at the brokers, it issues > > > > > > DescribeDirsRequest to check the replica to log dirs mapping. For > > > each > > > > > > partition in the response, the broker returns a state to indicate > > > > whether > > > > > > the replica is final, temporary or pending. If all replicas are > in > > > the > > > > > > final state, the tool checks if all replicas are in the expected > > log > > > > > dirs. > > > > > > If they are not, output a warning (and perhaps suggest the users > to > > > > move > > > > > > the data again). However, this should be rare. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Mon, Mar 20, 2017 at 10:46 AM, Dong Lin <lindon...@gmail.com> > > > > wrote: > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > Thanks for the response! It seems that we have only two > remaining > > > > > issues. > > > > > > > Please see my reply below. > > > > > > > > > > > > > > On Mon, Mar 20, 2017 at 7:45 AM, Jun Rao <j...@confluent.io> > > wrote: > > > > > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > > > > > Thanks for the update. A few replies inlined below. > > > > > > > > > > > > > > > > On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin < > > lindon...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > > > > > Thanks for your comment! Please see my reply below. > > > > > > > > > > > > > > > > > > On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao <j...@confluent.io > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > > > > > 10. Could you comment on that? > > > > > > > > > > > > > > > > > > > > > > > > > > > > Sorry, I missed that comment. > > > > > > > > > > > > > > > > > > Good point. I think the log segments in topicPartition.move > > > > > directory > > > > > > > > will > > > > > > > > > be subject to log truncation, log retention and log > cleaning > > in > > > > the > > > > > > > same > > > > > > > > > way as the log segments in the source log directory. I just > > > > > specified > > > > > > > > this > > > > > > > > > inthe KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > This is ok, but doubles the overhead of log cleaning. We > > probably > > > > > want > > > > > > to > > > > > > > > think a bit more on this. > > > > > > > > > > > > > > > > > > > > > > I think this is OK because the number of replicas that are > being > > > > moved > > > > > is > > > > > > > limited by the number of ReplicaMoveThread. The default number > of > > > > > > > ReplicaMoveThread is the number of log directories, which mean > we > > > > incur > > > > > > > these overhead for at most one replica per log directory at any > > > time. > > > > > > > Suppose there are most than 100 replica in any log directory, > the > > > > > > increase > > > > > > > in overhead is less than 1%. > > > > > > > > > > > > > > Another way to look at this is that this is no worse than > replica > > > > > > > reassignment. When we reassign replica from one broker to > > another, > > > we > > > > > > will > > > > > > > double the overhread of log cleaning in the cluster for this > > > replica. > > > > > If > > > > > > we > > > > > > > are OK with this then we are OK with replica movement between > log > > > > > > > directories. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 11.2 "I am concerned that the ChangeReplicaDirRequest > would > > > be > > > > > lost > > > > > > > if > > > > > > > > > > broker > > > > > > > > > > restarts after it sends ChangeReplicaDirResponse but > before > > > it > > > > > > > receives > > > > > > > > > > LeaderAndIsrRequest." > > > > > > > > > > > > > > > > > > > > In that case, the reassignment tool could detect that > > through > > > > > > > > > > DescribeDirsRequest > > > > > > > > > > and issue ChangeReplicaDirRequest again, right? In the > > common > > > > > case, > > > > > > > > this > > > > > > > > > is > > > > > > > > > > probably not needed and we only need to write each > replica > > > > once. > > > > > > > > > > > > > > > > > > > > My main concern with the approach in the current KIP is > > that > > > > > once a > > > > > > > new > > > > > > > > > > replica is created in the wrong log dir, the cross log > > > > directory > > > > > > > > movement > > > > > > > > > > may not catch up until the new replica is fully > > bootstrapped. > > > > So, > > > > > > we > > > > > > > > end > > > > > > > > > up > > > > > > > > > > writing the data for the same replica twice. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree with your concern. My main concern is that it is a > > bit > > > > > weird > > > > > > if > > > > > > > > > ChangeReplicaDirResponse can not guarantee success and the > > tool > > > > > needs > > > > > > > to > > > > > > > > > rely on DescribeDirResponse to see if it needs to send > > > > > > > > > ChangeReplicaDirRequest again. > > > > > > > > > > > > > > > > > > How about this: If broker doesn't not have already replica > > > > created > > > > > > for > > > > > > > > the > > > > > > > > > specified topicParition when it receives > > > ChangeReplicaDirRequest, > > > > > it > > > > > > > will > > > > > > > > > reply ReplicaNotAvailableException AND remember (replica, > > > > > destination > > > > > > > log > > > > > > > > > directory) pair in memory to create the replica in the > > > specified > > > > > log > > > > > > > > > directory. > > > > > > > > > > > > > > > > > > > > > > > > > > I am not sure if returning ReplicaNotAvailableException is > > > useful? > > > > > What > > > > > > > > will the client do on receiving ReplicaNotAvailableException > in > > > > this > > > > > > > case? > > > > > > > > > > > > > > > > Perhaps we could just replace the is_temporary field in > > > > > > > > DescribeDirsRresponsePartition with a state field. We can > use 0 > > > to > > > > > > > indicate > > > > > > > > the partition is created, 1 to indicate the partition is > > > temporary > > > > > and > > > > > > 2 > > > > > > > to > > > > > > > > indicate that the partition is pending. > > > > > > > > > > > > > > > > > > > > > > ReplicaNotAvailableException is useful because the client can > > > re-send > > > > > > > ChangeReplicaDirRequest (with backoff) after receiving > > > > > > > ReplicaNotAvailableException in the response. > > > ChangeReplicaDirRequest > > > > > > will > > > > > > > only succeed after replica has been created for the specified > > > > partition > > > > > > in > > > > > > > the broker. > > > > > > > > > > > > > > I think this is cleaner than asking reassignment tool to detect > > > that > > > > > > > through DescribeDirsRequest and issue ChangeReplicaDirRequest > > > again. > > > > > Both > > > > > > > solution has the same chance of writing the data for the same > > > replica > > > > > > > twice. In the original solution, the reassignment tool will > keep > > > > > retrying > > > > > > > ChangeReplicaDirRequest until success. In the second suggested > > > > > solution, > > > > > > > the reassignment tool needs to send ChangeReplicaDirRequest, > send > > > > > > > DescribeDirsRequest to verify result, and retry > > > > ChangeReplicaDirRequest > > > > > > and > > > > > > > DescribeDirsRequest again if the replica hasn't been created > > > already. > > > > > > Thus > > > > > > > the second solution couples ChangeReplicaDirRequest with > > > > > > > DescribeDirsRequest and makes tool's logic is bit more > > complicated. > > > > > > > > > > > > > > Besides, I am not sure I understand your suggestion for > > > is_temporary > > > > > > field. > > > > > > > It seems that a replica can have only two states, i.e. normal > it > > is > > > > > being > > > > > > > used to serve fetch/produce requests and temporary if it is a > > > replica > > > > > is > > > > > > > that catching up with the normal one. If you think we should > have > > > > > > > reassignment tool send DescribeDirsRequest before retrying > > > > > > > ChangeReplicaDirRequest, can you elaborate a bit what is the > > > > "pending" > > > > > > > state? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 11.3 Are you saying the value in --throttle will be used > to > > > set > > > > > > both > > > > > > > > > > intra.broker.throttled.rate and > > leader.follower.replication. > > > > > > > > > > throttled.replicas? > > > > > > > > > > > > > > > > > > > > > > > > > > > > No. --throttle will be used to only to set > > > > > > leader.follower.replication > > > > > > > as > > > > > > > > > it does now. I think we do not need any option in the > > > > > > > > > kafka-reassignment-partitions.sh to specify > > > > > > > intra.broker.throttled.rate. > > > > > > > > > User canset it in broker config or dynamically using > > > > > kafka-config.sh. > > > > > > > > Does > > > > > > > > > this sound OK? > > > > > > > > > > > > > > > > > > > > > > > > > > Ok. This sounds good. It would be useful to make this clear > in > > > the > > > > > > wiki. > > > > > > > > > > > > > > > > Sure. I have updated the wiki to specify this: "the quota > > > specified > > > > > by > > > > > > > the > > > > > > > argument `–throttle` will be applied to only inter-broker > replica > > > > > > > reassignment. It does not affect the quota for replica movement > > > > between > > > > > > log > > > > > > > directories". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 12.2 If the user only wants to check one topic, the tool > > > could > > > > do > > > > > > the > > > > > > > > > > filtering on the client side, right? My concern with > having > > > > both > > > > > > > > log_dirs > > > > > > > > > > and topics is the semantic. For example, if both are not > > > empty, > > > > > do > > > > > > we > > > > > > > > > > return the intersection or the union? > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes the tool could filter on the client side. But the > purpose > > > of > > > > > > having > > > > > > > > > this field is to reduce response side in case broker has a > > lot > > > of > > > > > > > topics. > > > > > > > > > The both fields are used as filter and the result is > > > > intersection. > > > > > Do > > > > > > > you > > > > > > > > > think this semantic is confusing or counter-intuitive? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ok. Could we document the semantic when both dirs and topics > > are > > > > > > > specified? > > > > > > > > > > > > > > > > > > > > > > Sure. I have updated the wiki to specify this: "log_dirs and > > topics > > > > are > > > > > > > used to filter the results to include only the specified > > > > log_dir/topic. > > > > > > The > > > > > > > result is the intersection of both filters". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 13, 2017 at 3:32 PM, Dong Lin < > > > lindon...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > > > > > > > > > Thanks much for your detailed comments. Please see my > > reply > > > > > > below. > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao < > > j...@confluent.io > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the updated KIP. Some more comments below. > > > > > > > > > > > > > > > > > > > > > > > > 10. For the .move log, do we perform any segment > > deletion > > > > > > (based > > > > > > > on > > > > > > > > > > > > retention) or log cleaning (if a compacted topic)? Or > > do > > > we > > > > > > only > > > > > > > > > enable > > > > > > > > > > > > that after the swap? > > > > > > > > > > > > > > > > > > > > > > > > 11. kafka-reassign-partitions.sh > > > > > > > > > > > > 11.1 If all reassigned replicas are in the current > > broker > > > > and > > > > > > > only > > > > > > > > > the > > > > > > > > > > > log > > > > > > > > > > > > directories have changed, we can probably optimize > the > > > tool > > > > > to > > > > > > > not > > > > > > > > > > > trigger > > > > > > > > > > > > partition reassignment through the controller and > only > > > > > > > > > > > > send ChangeReplicaDirRequest. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, the reassignment script should not create the > > > > reassignment > > > > > > > znode > > > > > > > > > if > > > > > > > > > > no > > > > > > > > > > > replicas are not be moved between brokers. This falls > > into > > > > the > > > > > > "How > > > > > > > > to > > > > > > > > > > move > > > > > > > > > > > replica between log directories on the same broker" of > > the > > > > > > Proposed > > > > > > > > > > Change > > > > > > > > > > > section. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 11.2 If ChangeReplicaDirRequest specifies a replica > > > that's > > > > > not > > > > > > > > > created > > > > > > > > > > > yet, > > > > > > > > > > > > could the broker just remember that in memory and > > create > > > > the > > > > > > > > replica > > > > > > > > > > when > > > > > > > > > > > > the creation is requested? This way, when doing > cluster > > > > > > > expansion, > > > > > > > > we > > > > > > > > > > can > > > > > > > > > > > > make sure that the new replicas on the new brokers > are > > > > > created > > > > > > in > > > > > > > > the > > > > > > > > > > > right > > > > > > > > > > > > log directory in the first place. We can also avoid > the > > > > tool > > > > > > > having > > > > > > > > > to > > > > > > > > > > > keep > > > > > > > > > > > > issuing ChangeReplicaDirRequest in response to > > > > > > > > > > > > ReplicaNotAvailableException. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am concerned that the ChangeReplicaDirRequest would > be > > > lost > > > > > if > > > > > > > > broker > > > > > > > > > > > restarts after it sends ChangeReplicaDirResponse but > > before > > > > it > > > > > > > > receives > > > > > > > > > > > LeaderAndIsrRequest. In this case, the user will > receive > > > > > success > > > > > > > when > > > > > > > > > > they > > > > > > > > > > > initiate replica reassignment, but replica reassignment > > > will > > > > > > never > > > > > > > > > > complete > > > > > > > > > > > when they verify the reassignment later. This would be > > > > > confusing > > > > > > to > > > > > > > > > user. > > > > > > > > > > > > > > > > > > > > > > There are three different approaches to this problem if > > > > broker > > > > > > has > > > > > > > > not > > > > > > > > > > > created replica yet after it receives > > > > ChangeReplicaDirResquest: > > > > > > > > > > > > > > > > > > > > > > 1) Broker immediately replies to user with > > > > > > > > ReplicaNotAvailableException > > > > > > > > > > and > > > > > > > > > > > user can decide to retry again later. The advantage of > > this > > > > > > > solution > > > > > > > > is > > > > > > > > > > > that the broker logic is very simple and the > reassignment > > > > > script > > > > > > > > logic > > > > > > > > > > also > > > > > > > > > > > seems straightforward. The disadvantage is that user > > script > > > > has > > > > > > to > > > > > > > > > retry. > > > > > > > > > > > But it seems fine - we can set interval between retries > > to > > > be > > > > > 0.5 > > > > > > > sec > > > > > > > > > so > > > > > > > > > > > that broker want be bombarded by those requests. This > is > > > the > > > > > > > solution > > > > > > > > > > > chosen in the current KIP. > > > > > > > > > > > > > > > > > > > > > > 2) Broker can put ChangeReplicaDirRequest in a > purgatory > > > with > > > > > > > timeout > > > > > > > > > and > > > > > > > > > > > replies to user after the replica has been created. I > > > didn't > > > > > > choose > > > > > > > > > this > > > > > > > > > > in > > > > > > > > > > > the interest of keeping broker logic simpler. > > > > > > > > > > > > > > > > > > > > > > 3) Broker can remember that by making a mark in the > disk, > > > > e.g. > > > > > > > create > > > > > > > > > > > topicPartition.tomove directory in the destination log > > > > > directory. > > > > > > > > This > > > > > > > > > > mark > > > > > > > > > > > will be persisted across broker restart. This is the > > first > > > > > idea I > > > > > > > had > > > > > > > > > > but I > > > > > > > > > > > replaced it with solution 1) in the interest of keeping > > > > broker > > > > > > > > simple. > > > > > > > > > > > > > > > > > > > > > > It seems that solution 1) is the simplest one that > works. > > > > But I > > > > > > am > > > > > > > OK > > > > > > > > > to > > > > > > > > > > > switch to the other two solutions if we don't want the > > > retry > > > > > > logic. > > > > > > > > > What > > > > > > > > > > do > > > > > > > > > > > you think? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 11.3 Do we need an option in the tool to specify > > > > intra.broker. > > > > > > > > > > > > throttled.rate? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I don't find it useful to add this option to > > > > > > > > > > kafka-reassign-partitions.sh. > > > > > > > > > > > The reason we have the option "--throttle" in the > script > > to > > > > > > > throttle > > > > > > > > > > > replication rate is that we usually want higher quota > to > > > fix > > > > an > > > > > > > > offline > > > > > > > > > > > replica to get out of URP. But we are OK to have a > lower > > > > quota > > > > > if > > > > > > > we > > > > > > > > > are > > > > > > > > > > > moving replica only to balance the cluster. Thus it is > > > common > > > > > for > > > > > > > SRE > > > > > > > > > to > > > > > > > > > > > use different quota when using > > kafka-reassign-partitions.sh > > > > to > > > > > > move > > > > > > > > > > replica > > > > > > > > > > > between brokers. > > > > > > > > > > > > > > > > > > > > > > However, the only reason for moving replica between log > > > > > > directories > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > same broker is to balance cluster resource. Thus the > > option > > > > to > > > > > > > > > > > specify intra.broker.throttled.rate in the tool is not > > that > > > > > > > useful. I > > > > > > > > > am > > > > > > > > > > > inclined not to add this option to keep this tool's > usage > > > > > > simpler. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 12. DescribeDirsRequest > > > > > > > > > > > > 12.1 In other requests like CreateTopicRequest, we > > return > > > > an > > > > > > > empty > > > > > > > > > list > > > > > > > > > > > in > > > > > > > > > > > > the response for an empty input list. If the input > list > > > is > > > > > > null, > > > > > > > we > > > > > > > > > > > return > > > > > > > > > > > > everything. We should probably follow the same > > convention > > > > > here. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks. I wasn't aware of this convention. I have > change > > > > > > > > > > > DescribeDirsRequest so that "null" indicates "all". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 12.2 Do we need the topics field? Since the request > is > > > > about > > > > > > log > > > > > > > > > dirs, > > > > > > > > > > it > > > > > > > > > > > > makes sense to specify the log dirs. But it's weird > to > > > > > specify > > > > > > > > > topics. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The topics field is not necessary. But it is useful to > > > reduce > > > > > the > > > > > > > > > > response > > > > > > > > > > > size in case user are only interested in the status of > a > > > few > > > > > > > topics. > > > > > > > > > For > > > > > > > > > > > example, user may have initiated the reassignment of a > > > given > > > > > > > replica > > > > > > > > > from > > > > > > > > > > > one log directory to another log directory on the same > > > > broker, > > > > > > and > > > > > > > > the > > > > > > > > > > user > > > > > > > > > > > only wants to check the status of this given partition > by > > > > > looking > > > > > > > > > > > at DescribeDirsResponse. Thus this field is useful. > > > > > > > > > > > > > > > > > > > > > > I am not sure if it is weird to call this request > > > > > > > > DescribeDirsRequest. > > > > > > > > > > The > > > > > > > > > > > response is a map from log directory to information to > > some > > > > > > > > partitions > > > > > > > > > on > > > > > > > > > > > the log directory. Do you think we need to change the > > name > > > of > > > > > the > > > > > > > > > > request? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 12.3 DescribeDirsResponsePartition: Should we include > > > > > > firstOffset > > > > > > > > and > > > > > > > > > > > > nextOffset in the response? That could be useful to > > track > > > > the > > > > > > > > > progress > > > > > > > > > > of > > > > > > > > > > > > the movement. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yeah good point. I agree it is useful to include > > > logEndOffset > > > > > in > > > > > > > the > > > > > > > > > > > response. According to Log.scala doc the logEndOffset > is > > > > > > equivalent > > > > > > > > to > > > > > > > > > > the > > > > > > > > > > > nextOffset. User can track progress by checking the > > > > difference > > > > > > > > between > > > > > > > > > > > logEndOffset of the given partition in the source and > > > > > destination > > > > > > > log > > > > > > > > > > > directories. I have added logEndOffset to the > > > > > > > > > > DescribeDirsResponsePartition > > > > > > > > > > > in the KIP. > > > > > > > > > > > > > > > > > > > > > > But it seems that we don't need firstOffset in the > > > response. > > > > Do > > > > > > you > > > > > > > > > think > > > > > > > > > > > firstOffset is still needed? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 13. ChangeReplicaDirResponse: Do we need error code > at > > > both > > > > > > > levels? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > My bad. It is not needed. I have removed request level > > > error > > > > > > code. > > > > > > > I > > > > > > > > > also > > > > > > > > > > > added ChangeReplicaDirRequestTopic and > > > > > > > ChangeReplicaDirResponseTopic > > > > > > > > to > > > > > > > > > > > reduce duplication of the "topic" string in the request > > and > > > > > > > response. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 14. num.replica.move.threads: Does it default to # > log > > > > dirs? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > No. It doesn't. I expect default number to be set to a > > > > > > conservative > > > > > > > > > value > > > > > > > > > > > such as 3. It may be surprising to user if the number > of > > > > > threads > > > > > > > > > increase > > > > > > > > > > > just because they have assigned more log directories to > > > Kafka > > > > > > > broker. > > > > > > > > > > > > > > > > > > > > > > It seems that the number of replica move threads > doesn't > > > have > > > > > to > > > > > > > > depend > > > > > > > > > > on > > > > > > > > > > > the number of log directories. It is possible to have > one > > > > > thread > > > > > > > that > > > > > > > > > > moves > > > > > > > > > > > replicas across all log directories. On the other hand > we > > > can > > > > > > have > > > > > > > > > > multiple > > > > > > > > > > > threads to move replicas to the same log directory. For > > > > > example, > > > > > > if > > > > > > > > > > broker > > > > > > > > > > > uses SSD, the CPU instead of disk IO may be the replica > > > move > > > > > > > > bottleneck > > > > > > > > > > and > > > > > > > > > > > it will be faster to move replicas using multiple > threads > > > per > > > > > log > > > > > > > > > > > directory. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 9, 2017 at 7:04 PM, Dong Lin < > > > > > lindon...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > I just made one correction in the KIP. If broker > > > receives > > > > > > > > > > > > > ChangeReplicaDirRequest and the replica hasn't been > > > > created > > > > > > > > there, > > > > > > > > > > the > > > > > > > > > > > > > broker will respond ReplicaNotAvailableException. > > > > > > > > > > > > > The kafka-reassignemnt-partitions.sh will need to > > > > re-send > > > > > > > > > > > > > ChangeReplicaDirRequest in this case in order to > wait > > > for > > > > > > > > > controller > > > > > > > > > > to > > > > > > > > > > > > > send LeaderAndIsrRequest to broker. The previous > > > approach > > > > > of > > > > > > > > > creating > > > > > > > > > > > an > > > > > > > > > > > > > empty directory seems hacky. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin < > > > > > > lindon...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your comments! I have updated the KIP > to > > > > > address > > > > > > > > your > > > > > > > > > > > > > comments. > > > > > > > > > > > > > > Please see my reply inline. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you let me know if the latest KIP has > addressed > > > > your > > > > > > > > > comments? > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao < > > > > > j...@confluent.io> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Hi, Dong, > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Thanks for the reply. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> 1.3 So the thread gets the lock, checks if > caught > > up > > > > and > > > > > > > > > releases > > > > > > > > > > > the > > > > > > > > > > > > > lock > > > > > > > > > > > > > >> if not? Then, in the case when there is > continuous > > > > > > incoming > > > > > > > > > data, > > > > > > > > > > > the > > > > > > > > > > > > > >> thread may never get a chance to swap. One way > to > > > > > address > > > > > > > this > > > > > > > > > is > > > > > > > > > > > when > > > > > > > > > > > > > the > > > > > > > > > > > > > >> thread is getting really close in catching up, > > just > > > > hold > > > > > > > onto > > > > > > > > > the > > > > > > > > > > > lock > > > > > > > > > > > > > >> until the thread fully catches up. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, that was my original solution. I see your > > point > > > > that > > > > > > the > > > > > > > > > lock > > > > > > > > > > > may > > > > > > > > > > > > > not > > > > > > > > > > > > > > be fairly assigned to ReplicaMoveThread and > > > > > > > > RequestHandlerThread > > > > > > > > > > when > > > > > > > > > > > > > there > > > > > > > > > > > > > > is frequent incoming requets. You solution should > > > > address > > > > > > the > > > > > > > > > > problem > > > > > > > > > > > > > and I > > > > > > > > > > > > > > have updated the KIP to use it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> 2.3 So, you are saying that the partition > > > reassignment > > > > > > tool > > > > > > > > can > > > > > > > > > > > first > > > > > > > > > > > > > send > > > > > > > > > > > > > >> a ChangeReplicaDirRequest to relevant brokers to > > > > > establish > > > > > > > the > > > > > > > > > log > > > > > > > > > > > dir > > > > > > > > > > > > > for > > > > > > > > > > > > > >> replicas not created yet, then trigger the > > partition > > > > > > > movement > > > > > > > > > > across > > > > > > > > > > > > > >> brokers through the controller? That's actually > a > > > good > > > > > > idea. > > > > > > > > > Then, > > > > > > > > > > > we > > > > > > > > > > > > > can > > > > > > > > > > > > > >> just leave LeaderAndIsrRequest as it is. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, that is what I plan to do. If broker > receives > > a > > > > > > > > > > > > > > ChangeReplicaDirRequest while it is not leader or > > > > > follower > > > > > > of > > > > > > > > the > > > > > > > > > > > > > > partition, the broker will create an empty Log > > > instance > > > > > > > (i.e. a > > > > > > > > > > > > directory > > > > > > > > > > > > > > named topicPartition) in the destination log > > > directory > > > > so > > > > > > > that > > > > > > > > > the > > > > > > > > > > > > > replica > > > > > > > > > > > > > > will be placed there when broker receives > > > > > > LeaderAndIsrRequest > > > > > > > > > from > > > > > > > > > > > the > > > > > > > > > > > > > > broker. The broker should clean up empty those > Log > > > > > > instances > > > > > > > on > > > > > > > > > > > startup > > > > > > > > > > > > > > just in case a ChangeReplicaDirRequest was > > mistakenly > > > > > sent > > > > > > > to a > > > > > > > > > > > broker > > > > > > > > > > > > > that > > > > > > > > > > > > > > was not meant to be follower/leader of the > > > partition.. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Another thing related to > > > > > > > > > > > > > >> ChangeReplicaDirRequest. > > > > > > > > > > > > > >> Since this request may take long to complete, I > am > > > not > > > > > > sure > > > > > > > if > > > > > > > > > we > > > > > > > > > > > > should > > > > > > > > > > > > > >> wait for the movement to complete before > respond. > > > > While > > > > > > > > waiting > > > > > > > > > > for > > > > > > > > > > > > the > > > > > > > > > > > > > >> movement to complete, the idle connection may be > > > > killed > > > > > or > > > > > > > the > > > > > > > > > > > client > > > > > > > > > > > > > may > > > > > > > > > > > > > >> be gone already. An alternative is to return > > > > immediately > > > > > > and > > > > > > > > > add a > > > > > > > > > > > new > > > > > > > > > > > > > >> request like CheckReplicaDirRequest to see if > the > > > > > movement > > > > > > > has > > > > > > > > > > > > > completed. > > > > > > > > > > > > > >> The tool can take advantage of that to check the > > > > status. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree with your concern and solution. We need > > > request > > > > > to > > > > > > > > query > > > > > > > > > > the > > > > > > > > > > > > > > partition -> log_directory mapping on the > broker. I > > > > have > > > > > > > > updated > > > > > > > > > > the > > > > > > > > > > > > KIP > > > > > > > > > > > > > to > > > > > > > > > > > > > > remove need for ChangeReplicaDirRequestPurgato > ry. > > > > > > > > > > > > > > Instead, kafka-reassignemnt-partitions.sh will > > send > > > > > > > > > > > > DescribeDirsRequest > > > > > > > > > > > > > > to brokers when user wants to verify the > partition > > > > > > > assignment. > > > > > > > > > > Since > > > > > > > > > > > we > > > > > > > > > > > > > > need this DescribeDirsRequest anyway, we can also > > use > > > > > this > > > > > > > > > request > > > > > > > > > > to > > > > > > > > > > > > > > expose stats like the individual log size instead > > of > > > > > using > > > > > > > JMX. > > > > > > > > > One > > > > > > > > > > > > > > drawback of using JMX is that user has to manage > > the > > > > JMX > > > > > > port > > > > > > > > and > > > > > > > > > > > > related > > > > > > > > > > > > > > credentials if they haven't already done this, > > which > > > is > > > > > the > > > > > > > > case > > > > > > > > > at > > > > > > > > > > > > > > LinkedIn. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Thanks, > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Jun > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin < > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > Hey Jun, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > Thanks for the detailed explanation. I will > use > > > the > > > > > > > separate > > > > > > > > > > > thread > > > > > > > > > > > > > >> pool to > > > > > > > > > > > > > >> > move replica between log directories. I will > let > > > you > > > > > > know > > > > > > > > when > > > > > > > > > > the > > > > > > > > > > > > KIP > > > > > > > > > > > > > >> has > > > > > > > > > > > > > >> > been updated to use a separate thread pool. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > Here is my response to your other questions: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > 1.3 My idea is that the ReplicaMoveThread that > > > moves > > > > > > data > > > > > > > > > should > > > > > > > > > > > get > > > > > > > > > > > > > the > > > > > > > > > > > > > >> > lock before checking whether the replica in > the > > > > > > > destination > > > > > > > > > log > > > > > > > > > > > > > >> directory > > > > > > > > > > > > > >> > has caught up. If the new replica has caught > up, > > > > then > > > > > > the > > > > > > > > > > > > > >> ReplicaMoveThread > > > > > > > > > > > > > >> > should swaps the replica while it is still > > holding > > > > the > > > > > > > lock. > > > > > > > > > The > > > > > > > > > > > > > >> > ReplicaFetcherThread or RequestHandlerThread > > will > > > > not > > > > > be > > > > > > > > able > > > > > > > > > to > > > > > > > > > > > > > append > > > > > > > > > > > > > >> > data to the replica in the source replica > during > > > > this > > > > > > > period > > > > > > > > > > > because > > > > > > > > > > > > > >> they > > > > > > > > > > > > > >> > can not get the lock. Does this address the > > > problem? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > 2.3 I get your point that we want to keep > > > controller > > > > > > > > simpler. > > > > > > > > > If > > > > > > > > > > > > admin > > > > > > > > > > > > > >> tool > > > > > > > > > > > > > >> > can send ChangeReplicaDirRequest to move data > > > > within a > > > > > > > > broker, > > > > > > > > > > > then > > > > > > > > > > > > > >> > controller probably doesn't even need to > include > > > log > > > > > > > > directory > > > > > > > > > > > path > > > > > > > > > > > > in > > > > > > > > > > > > > >> the > > > > > > > > > > > > > >> > LeaderAndIsrRequest. How about this: > controller > > > will > > > > > > only > > > > > > > > deal > > > > > > > > > > > with > > > > > > > > > > > > > >> > reassignment across brokers as it does now. If > > > user > > > > > > > > specified > > > > > > > > > > > > > >> destination > > > > > > > > > > > > > >> > replica for any disk, the admin tool will send > > > > > > > > > > > > ChangeReplicaDirRequest > > > > > > > > > > > > > >> and > > > > > > > > > > > > > >> > wait for response from broker to confirm that > > all > > > > > > replicas > > > > > > > > > have > > > > > > > > > > > been > > > > > > > > > > > > > >> moved > > > > > > > > > > > > > >> > to the destination log direcotry. The broker > > will > > > > put > > > > > > > > > > > > > >> > ChangeReplicaDirRequset in a purgatory and > > respond > > > > > > either > > > > > > > > when > > > > > > > > > > the > > > > > > > > > > > > > >> movement > > > > > > > > > > > > > >> > is completed or when the request has > timed-out. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > 4. I agree that we can expose these metrics > via > > > JMX. > > > > > > But I > > > > > > > > am > > > > > > > > > > not > > > > > > > > > > > > sure > > > > > > > > > > > > > >> if > > > > > > > > > > > > > >> > it can be obtained easily with good > performance > > > > using > > > > > > > either > > > > > > > > > > > > existing > > > > > > > > > > > > > >> tools > > > > > > > > > > > > > >> > or new script in kafka. I will ask SREs for > > their > > > > > > opinion. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > Thanks, > > > > > > > > > > > > > >> > Dong > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao < > > > > > > j...@confluent.io > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > Hi, Dong, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > Thanks for the updated KIP. A few more > > comments > > > > > below. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > 1.1 and 1.2: I am still not sure there is > > enough > > > > > > benefit > > > > > > > > of > > > > > > > > > > > > reusing > > > > > > > > > > > > > >> > > ReplicaFetchThread > > > > > > > > > > > > > >> > > to move data across disks. > > > > > > > > > > > > > >> > > (a) A big part of ReplicaFetchThread is to > > deal > > > > with > > > > > > > > issuing > > > > > > > > > > and > > > > > > > > > > > > > >> tracking > > > > > > > > > > > > > >> > > fetch requests. So, it doesn't feel that we > > get > > > > much > > > > > > > from > > > > > > > > > > > reusing > > > > > > > > > > > > > >> > > ReplicaFetchThread > > > > > > > > > > > > > >> > > only to disable the fetching part. > > > > > > > > > > > > > >> > > (b) The leader replica has no > > ReplicaFetchThread > > > > to > > > > > > > start > > > > > > > > > > with. > > > > > > > > > > > It > > > > > > > > > > > > > >> feels > > > > > > > > > > > > > >> > > weird to start one just for intra broker > data > > > > > > movement. > > > > > > > > > > > > > >> > > (c) The ReplicaFetchThread is per broker. > > > > > Intuitively, > > > > > > > the > > > > > > > > > > > number > > > > > > > > > > > > of > > > > > > > > > > > > > >> > > threads doing intra broker data movement > > should > > > be > > > > > > > related > > > > > > > > > to > > > > > > > > > > > the > > > > > > > > > > > > > >> number > > > > > > > > > > > > > >> > of > > > > > > > > > > > > > >> > > disks in the broker, not the number of > brokers > > > in > > > > > the > > > > > > > > > cluster. > > > > > > > > > > > > > >> > > (d) If the destination disk fails, we want > to > > > stop > > > > > the > > > > > > > > intra > > > > > > > > > > > > broker > > > > > > > > > > > > > >> data > > > > > > > > > > > > > >> > > movement, but want to continue inter broker > > > > > > replication. > > > > > > > > So, > > > > > > > > > > > > > >> logically, > > > > > > > > > > > > > >> > it > > > > > > > > > > > > > >> > > seems it's better to separate out the two. > > > > > > > > > > > > > >> > > (e) I am also not sure if we should reuse > the > > > > > existing > > > > > > > > > > > throttling > > > > > > > > > > > > > for > > > > > > > > > > > > > >> > > replication. It's designed to handle traffic > > > > across > > > > > > > > brokers > > > > > > > > > > and > > > > > > > > > > > > the > > > > > > > > > > > > > >> > > delaying is done in the fetch request. So, > if > > we > > > > are > > > > > > not > > > > > > > > > doing > > > > > > > > > > > > > >> > > fetching in ReplicaFetchThread, > > > > > > > > > > > > > >> > > I am not sure the existing throttling is > > > > effective. > > > > > > > Also, > > > > > > > > > when > > > > > > > > > > > > > >> specifying > > > > > > > > > > > > > >> > > the throttling of moving data across disks, > it > > > > seems > > > > > > the > > > > > > > > > user > > > > > > > > > > > > > >> shouldn't > > > > > > > > > > > > > >> > > care about whether a replica is a leader or > a > > > > > > follower. > > > > > > > > > > Reusing > > > > > > > > > > > > the > > > > > > > > > > > > > >> > > existing throttling config name will be > > awkward > > > in > > > > > > this > > > > > > > > > > regard. > > > > > > > > > > > > > >> > > (f) It seems it's simpler and more > consistent > > to > > > > > use a > > > > > > > > > > separate > > > > > > > > > > > > > thread > > > > > > > > > > > > > >> > pool > > > > > > > > > > > > > >> > > for local data movement (for both leader and > > > > > follower > > > > > > > > > > replicas). > > > > > > > > > > > > > This > > > > > > > > > > > > > >> > > process can then be configured (e.g. number > of > > > > > > threads, > > > > > > > > etc) > > > > > > > > > > and > > > > > > > > > > > > > >> > throttled > > > > > > > > > > > > > >> > > independently. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > 1.3 Yes, we will need some synchronization > > > there. > > > > > So, > > > > > > if > > > > > > > > the > > > > > > > > > > > > > movement > > > > > > > > > > > > > >> > > thread catches up, gets the lock to do the > > swap, > > > > but > > > > > > > > > realizes > > > > > > > > > > > that > > > > > > > > > > > > > new > > > > > > > > > > > > > >> > data > > > > > > > > > > > > > >> > > is added, it has to continue catching up > while > > > > > holding > > > > > > > the > > > > > > > > > > lock? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > 2.3 The benefit of including the desired log > > > > > directory > > > > > > > in > > > > > > > > > > > > > >> > > LeaderAndIsrRequest > > > > > > > > > > > > > >> > > during partition reassignment is that the > > > > controller > > > > > > > > doesn't > > > > > > > > > > > need > > > > > > > > > > > > to > > > > > > > > > > > > > >> > track > > > > > > > > > > > > > >> > > the progress for disk movement. So, you > don't > > > need > > > > > the > > > > > > > > > > > additional > > > > > > > > > > > > > >> > > BrokerDirStateUpdateRequest. Then the > > controller > > > > > never > > > > > > > > needs > > > > > > > > > > to > > > > > > > > > > > > > issue > > > > > > > > > > > > > >> > > ChangeReplicaDirRequest. > > > > > > > > > > > > > >> > > Only the admin tool will issue > > > > > ChangeReplicaDirRequest > > > > > > > to > > > > > > > > > move > > > > > > > > > > > > data > > > > > > > > > > > > > >> > within > > > > > > > > > > > > > >> > > a broker. I agree that this makes > > > > > LeaderAndIsrRequest > > > > > > > more > > > > > > > > > > > > > >> complicated, > > > > > > > > > > > > > >> > but > > > > > > > > > > > > > >> > > that seems simpler than changing the > > controller > > > to > > > > > > track > > > > > > > > > > > > additional > > > > > > > > > > > > > >> > states > > > > > > > > > > > > > >> > > during partition reassignment. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > 4. We want to make a decision on how to > expose > > > the > > > > > > > stats. > > > > > > > > So > > > > > > > > > > > far, > > > > > > > > > > > > we > > > > > > > > > > > > > >> are > > > > > > > > > > > > > >> > > exposing stats like the individual log size > as > > > > JMX. > > > > > > So, > > > > > > > > one > > > > > > > > > > way > > > > > > > > > > > is > > > > > > > > > > > > > to > > > > > > > > > > > > > >> > just > > > > > > > > > > > > > >> > > add new jmx to expose the log directory of > > > > > individual > > > > > > > > > > replicas. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > Thanks, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > Jun > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > On Thu, Mar 2, 2017 at 11:18 PM, Dong Lin < > > > > > > > > > > lindon...@gmail.com> > > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > Hey Jun, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > Thanks for all the comments! Please see my > > > > answer > > > > > > > > below. I > > > > > > > > > > > have > > > > > > > > > > > > > >> updated > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > KIP to address most of the questions and > > make > > > > the > > > > > > KIP > > > > > > > > > easier > > > > > > > > > > > to > > > > > > > > > > > > > >> > > understand. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > Thanks, > > > > > > > > > > > > > >> > > > Dong > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao < > > > > > > > > j...@confluent.io > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > Hi, Dong, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > Thanks for the KIP. A few comments > below. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > 1. For moving data across directories > > > > > > > > > > > > > >> > > > > 1.1 I am not sure why we want to use > > > > > > > > > ReplicaFetcherThread > > > > > > > > > > to > > > > > > > > > > > > > move > > > > > > > > > > > > > >> > data > > > > > > > > > > > > > >> > > > > around in the leader. ReplicaFetchThread > > > > fetches > > > > > > > data > > > > > > > > > from > > > > > > > > > > > > > socket. > > > > > > > > > > > > > >> > For > > > > > > > > > > > > > >> > > > > moving data locally, it seems that we > want > > > to > > > > > > avoid > > > > > > > > the > > > > > > > > > > > socket > > > > > > > > > > > > > >> > > overhead. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > The purpose of using ReplicaFetchThread is > > to > > > > > re-use > > > > > > > > > > existing > > > > > > > > > > > > > thread > > > > > > > > > > > > > >> > > > instead of creating more threads and make > > our > > > > > thread > > > > > > > > model > > > > > > > > > > > more > > > > > > > > > > > > > >> > complex. > > > > > > > > > > > > > >> > > It > > > > > > > > > > > > > >> > > > seems like a nature choice for copying > data > > > > > between > > > > > > > > disks > > > > > > > > > > > since > > > > > > > > > > > > it > > > > > > > > > > > > > >> is > > > > > > > > > > > > > >> > > > similar to copying data between brokers. > > > Another > > > > > > > reason > > > > > > > > is > > > > > > > > > > > that > > > > > > > > > > > > if > > > > > > > > > > > > > >> the > > > > > > > > > > > > > >> > > > replica to be moved is a follower, we > don't > > > need > > > > > > lock > > > > > > > to > > > > > > > > > > swap > > > > > > > > > > > > > >> replicas > > > > > > > > > > > > > >> > > when > > > > > > > > > > > > > >> > > > destination replica has caught up, since > the > > > > same > > > > > > > thread > > > > > > > > > > which > > > > > > > > > > > > is > > > > > > > > > > > > > >> > > fetching > > > > > > > > > > > > > >> > > > data from leader will swap the replica. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > The ReplicaFetchThread will not incur > socket > > > > > > overhead > > > > > > > > > while > > > > > > > > > > > > > copying > > > > > > > > > > > > > >> > data > > > > > > > > > > > > > >> > > > between disks. It will read directly from > > > source > > > > > > disk > > > > > > > > (as > > > > > > > > > we > > > > > > > > > > > do > > > > > > > > > > > > > when > > > > > > > > > > > > > >> > > > processing FetchRequest) and write to > > > > destination > > > > > > disk > > > > > > > > (as > > > > > > > > > > we > > > > > > > > > > > do > > > > > > > > > > > > > >> when > > > > > > > > > > > > > >> > > > processing ProduceRequest). > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > 1.2 I am also not sure about moving data > > in > > > > the > > > > > > > > > > > > > >> ReplicaFetcherThread > > > > > > > > > > > > > >> > in > > > > > > > > > > > > > >> > > > the > > > > > > > > > > > > > >> > > > > follower. For example, I am not sure > > setting > > > > > > > > > > > > > >> replica.fetch.max.wait > > > > > > > > > > > > > >> > to > > > > > > > > > > > > > >> > > 0 > > > > > > > > > > > > > >> > > > > is ideal. It may not always be > effective > > > > since > > > > > a > > > > > > > > fetch > > > > > > > > > > > > request > > > > > > > > > > > > > in > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > >> > > > > ReplicaFetcherThread could be > arbitrarily > > > > > delayed > > > > > > > due > > > > > > > > to > > > > > > > > > > > > > >> replication > > > > > > > > > > > > > >> > > > > throttling on the leader. In general, > the > > > data > > > > > > > > movement > > > > > > > > > > > logic > > > > > > > > > > > > > >> across > > > > > > > > > > > > > >> > > > disks > > > > > > > > > > > > > >> > > > > seems different from that in > > > > > ReplicaFetcherThread. > > > > > > > > So, I > > > > > > > > > > am > > > > > > > > > > > > not > > > > > > > > > > > > > >> sure > > > > > > > > > > > > > >> > > why > > > > > > > > > > > > > >> > > > > they need to be coupled. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > While it may not be the most efficient way > > to > > > > copy > > > > > > > data > > > > > > > > > > > between > > > > > > > > > > > > > >> local > > > > > > > > > > > > > >> > > > disks, it will be at least as efficient as > > > > copying > > > > > > > data > > > > > > > > > from > > > > > > > > > > > > > leader > > > > > > > > > > > > > >> to > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > destination disk. The expected goal of > > KIP-113 > > > > is > > > > > to > > > > > > > > > enable > > > > > > > > > > > data > > > > > > > > > > > > > >> > movement > > > > > > > > > > > > > >> > > > between disks with no less efficiency than > > > what > > > > we > > > > > > do > > > > > > > > now > > > > > > > > > > when > > > > > > > > > > > > > >> moving > > > > > > > > > > > > > >> > > data > > > > > > > > > > > > > >> > > > between brokers. I think we can optimize > its > > > > > > > performance > > > > > > > > > > using > > > > > > > > > > > > > >> separate > > > > > > > > > > > > > >> > > > thread if the performance is not good > > enough. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > 1.3 Could you add a bit more details on > > how > > > we > > > > > > swap > > > > > > > > the > > > > > > > > > > > > replicas > > > > > > > > > > > > > >> when > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > > new ones are fully caught up? For > example, > > > > what > > > > > > > > happens > > > > > > > > > > when > > > > > > > > > > > > the > > > > > > > > > > > > > >> new > > > > > > > > > > > > > >> > > > > replica in the new log directory is > caught > > > up, > > > > > but > > > > > > > > when > > > > > > > > > we > > > > > > > > > > > > want > > > > > > > > > > > > > >> to do > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > > swap, some new data has arrived? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > If the replica is a leader, then > > > > > > ReplicaFetcherThread > > > > > > > > will > > > > > > > > > > > > perform > > > > > > > > > > > > > >> the > > > > > > > > > > > > > >> > > > replacement. Proper lock is needed to > > prevent > > > > > > > > > > > > KafkaRequestHandler > > > > > > > > > > > > > >> from > > > > > > > > > > > > > >> > > > appending data to the topicPartition.log > on > > > the > > > > > > source > > > > > > > > > disks > > > > > > > > > > > > > before > > > > > > > > > > > > > >> > this > > > > > > > > > > > > > >> > > > replacement is completed by > > > > ReplicaFetcherThread. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > If the replica is a follower, because the > > same > > > > > > > > > > > > ReplicaFetchThread > > > > > > > > > > > > > >> which > > > > > > > > > > > > > >> > > > fetches data from leader will also swap > the > > > > > replica > > > > > > , > > > > > > > no > > > > > > > > > > lock > > > > > > > > > > > is > > > > > > > > > > > > > >> > needed. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > I have updated the KIP to specify both > more > > > > > > > explicitly. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > 1.4 Do we need to do the .move at the > log > > > > > segment > > > > > > > > level > > > > > > > > > or > > > > > > > > > > > > could > > > > > > > > > > > > > >> we > > > > > > > > > > > > > >> > > just > > > > > > > > > > > > > >> > > > do > > > > > > > > > > > > > >> > > > > that at the replica directory level? > > > Renaming > > > > > > just a > > > > > > > > > > > directory > > > > > > > > > > > > > is > > > > > > > > > > > > > >> > much > > > > > > > > > > > > > >> > > > > faster than renaming the log segments. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > Great point. I have updated the KIP to > > rename > > > > the > > > > > > log > > > > > > > > > > > directory > > > > > > > > > > > > > >> > instead. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > 1.5 Could you also describe a bit what > > > happens > > > > > > when > > > > > > > > > either > > > > > > > > > > > the > > > > > > > > > > > > > >> source > > > > > > > > > > > > > >> > > or > > > > > > > > > > > > > >> > > > > the target log directory fails while the > > > data > > > > > > moving > > > > > > > > is > > > > > > > > > in > > > > > > > > > > > > > >> progress? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > If source log directory fails, then the > > > replica > > > > > > > movement > > > > > > > > > > will > > > > > > > > > > > > stop > > > > > > > > > > > > > >> and > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > source replica is marked offline. If > > > destination > > > > > log > > > > > > > > > > directory > > > > > > > > > > > > > >> fails, > > > > > > > > > > > > > >> > > then > > > > > > > > > > > > > >> > > > the replica movement will stop. I have > > updated > > > > the > > > > > > KIP > > > > > > > > to > > > > > > > > > > > > clarify > > > > > > > > > > > > > >> this. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > 2. For partition reassignment. > > > > > > > > > > > > > >> > > > > 2.1 I am not sure if the controller can > > > block > > > > on > > > > > > > > > > > > > >> > > ChangeReplicaDirRequest. > > > > > > > > > > > > > >> > > > > Data movement may take a long time to > > > > complete. > > > > > If > > > > > > > > there > > > > > > > > > > is > > > > > > > > > > > an > > > > > > > > > > > > > >> > > > outstanding > > > > > > > > > > > > > >> > > > > request from the controller to a broker, > > > that > > > > > > broker > > > > > > > > > won't > > > > > > > > > > > be > > > > > > > > > > > > > >> able to > > > > > > > > > > > > > >> > > > > process any new request from the > > controller. > > > > So > > > > > if > > > > > > > > > another > > > > > > > > > > > > event > > > > > > > > > > > > > >> > (e.g. > > > > > > > > > > > > > >> > > > > broker failure) happens when the data > > > movement > > > > > is > > > > > > in > > > > > > > > > > > progress, > > > > > > > > > > > > > >> > > subsequent > > > > > > > > > > > > > >> > > > > LeaderAnIsrRequest will be delayed. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > Yeah good point. I missed the fact that > > there > > > is > > > > > be > > > > > > > only > > > > > > > > > one > > > > > > > > > > > > > >> inflight > > > > > > > > > > > > > >> > > > request from controller to broker. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > How about I add a request, e.g. > > > > > > > > > BrokerDirStateUpdateRequest, > > > > > > > > > > > > which > > > > > > > > > > > > > >> maps > > > > > > > > > > > > > >> > > > topicPartition to log directory and can be > > > sent > > > > > from > > > > > > > > > broker > > > > > > > > > > to > > > > > > > > > > > > > >> > controller > > > > > > > > > > > > > >> > > > to indicate completion? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > 2.2 in the KIP, the partition > reassignment > > > > tool > > > > > is > > > > > > > > also > > > > > > > > > > used > > > > > > > > > > > > for > > > > > > > > > > > > > >> > cases > > > > > > > > > > > > > >> > > > > where an admin just wants to balance the > > > > > existing > > > > > > > data > > > > > > > > > > > across > > > > > > > > > > > > > log > > > > > > > > > > > > > >> > > > > directories in the broker. In this case, > > it > > > > > seems > > > > > > > that > > > > > > > > > > it's > > > > > > > > > > > > over > > > > > > > > > > > > > >> > > killing > > > > > > > > > > > > > >> > > > to > > > > > > > > > > > > > >> > > > > have the process go through the > > controller. > > > A > > > > > > > simpler > > > > > > > > > > > approach > > > > > > > > > > > > > is > > > > > > > > > > > > > >> to > > > > > > > > > > > > > >> > > > issue > > > > > > > > > > > > > >> > > > > an RPC request to the broker directly. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > I agree we can optimize this case. It is > > just > > > > that > > > > > > we > > > > > > > > have > > > > > > > > > > to > > > > > > > > > > > > add > > > > > > > > > > > > > >> new > > > > > > > > > > > > > >> > > logic > > > > > > > > > > > > > >> > > > or code path to handle a scenario that is > > > > already > > > > > > > > covered > > > > > > > > > by > > > > > > > > > > > the > > > > > > > > > > > > > >> more > > > > > > > > > > > > > >> > > > complicated scenario. I will add it to the > > > KIP. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > 2.3 When using the partition > reassignment > > > tool > > > > > to > > > > > > > move > > > > > > > > > > > > replicas > > > > > > > > > > > > > >> > across > > > > > > > > > > > > > >> > > > > brokers, it make sense to be able to > > specify > > > > the > > > > > > log > > > > > > > > > > > directory > > > > > > > > > > > > > of > > > > > > > > > > > > > >> the > > > > > > > > > > > > > >> > > > newly > > > > > > > > > > > > > >> > > > > created replicas. The KIP does that in > two > > > > > > separate > > > > > > > > > > requests > > > > > > > > > > > > > >> > > > > ChangeReplicaDirRequest and > > > > LeaderAndIsrRequest, > > > > > > and > > > > > > > > > > tracks > > > > > > > > > > > > the > > > > > > > > > > > > > >> > > progress > > > > > > > > > > > > > >> > > > of > > > > > > > > > > > > > >> > > > > each independently. An alternative is to > > do > > > > that > > > > > > > just > > > > > > > > in > > > > > > > > > > > > > >> > > > > LeaderAndIsrRequest. > > > > > > > > > > > > > >> > > > > That way, the new replicas will be > created > > > in > > > > > the > > > > > > > > right > > > > > > > > > > log > > > > > > > > > > > > dir > > > > > > > > > > > > > in > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > >> > > > > first place and the controller just > needs > > to > > > > > track > > > > > > > the > > > > > > > > > > > > progress > > > > > > > > > > > > > of > > > > > > > > > > > > > >> > > > > partition reassignment in the current > way. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > I agree it is better to use one request > > > instead > > > > of > > > > > > two > > > > > > > > to > > > > > > > > > > > > request > > > > > > > > > > > > > >> > replica > > > > > > > > > > > > > >> > > > movement between disks. But I think the > > > > > performance > > > > > > > > > > advantage > > > > > > > > > > > of > > > > > > > > > > > > > >> doing > > > > > > > > > > > > > >> > so > > > > > > > > > > > > > >> > > > is negligible because we trigger replica > > > > > assignment > > > > > > > much > > > > > > > > > > less > > > > > > > > > > > > than > > > > > > > > > > > > > >> all > > > > > > > > > > > > > >> > > > other kinds of events in the Kafka > cluster. > > I > > > am > > > > > not > > > > > > > > sure > > > > > > > > > > that > > > > > > > > > > > > the > > > > > > > > > > > > > >> > > benefit > > > > > > > > > > > > > >> > > > of doing this is worth the effort to add > an > > > > > optional > > > > > > > > > string > > > > > > > > > > > > field > > > > > > > > > > > > > in > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > >> > > > LeaderAndIsrRequest. Also if we add this > > > > optional > > > > > > > field > > > > > > > > in > > > > > > > > > > the > > > > > > > > > > > > > >> > > > LeaderAndIsrRequest, we probably want to > > > remove > > > > > > > > > > > > > >> ChangeReplicaDirRequest > > > > > > > > > > > > > >> > > to > > > > > > > > > > > > > >> > > > avoid having two requests doing the same > > > thing. > > > > > But > > > > > > it > > > > > > > > > means > > > > > > > > > > > > user > > > > > > > > > > > > > >> > script > > > > > > > > > > > > > >> > > > can not send request directly to the > broker > > to > > > > > > trigger > > > > > > > > > > replica > > > > > > > > > > > > > >> movement > > > > > > > > > > > > > >> > > > between log directories. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > I will do it if you are strong about this > > > > > > optimzation. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > 3. /admin/reassign_partitions: Including > > the > > > > log > > > > > > dir > > > > > > > > in > > > > > > > > > > > every > > > > > > > > > > > > > >> replica > > > > > > > > > > > > > >> > > may > > > > > > > > > > > > > >> > > > > not be efficient. We could include a > list > > of > > > > log > > > > > > > > > > directories > > > > > > > > > > > > and > > > > > > > > > > > > > >> > > > reference > > > > > > > > > > > > > >> > > > > the index of the log directory in each > > > > replica. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > Good point. I have updated the KIP to use > > this > > > > > > > solution. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > 4. DescribeDirsRequest: The stats in the > > > > request > > > > > > are > > > > > > > > > > already > > > > > > > > > > > > > >> > available > > > > > > > > > > > > > >> > > > from > > > > > > > > > > > > > >> > > > > JMX. Do we need the new request? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > Does JMX also include the state (i.e. > > offline > > > or > > > > > > > online) > > > > > > > > > of > > > > > > > > > > > each > > > > > > > > > > > > > log > > > > > > > > > > > > > >> > > > directory and the log directory of each > > > replica? > > > > > If > > > > > > > not, > > > > > > > > > > then > > > > > > > > > > > > > maybe > > > > > > > > > > > > > >> we > > > > > > > > > > > > > >> > > > still need DescribeDirsRequest? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > 5. We want to be consistent on > > > > > > > ChangeReplicaDirRequest > > > > > > > > > vs > > > > > > > > > > > > > >> > > > > ChangeReplicaRequest. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > I think ChangeReplicaRequest and > > > > > > ChangeReplicaResponse > > > > > > > > is > > > > > > > > > my > > > > > > > > > > > > typo. > > > > > > > > > > > > > >> > Sorry, > > > > > > > > > > > > > >> > > > they are fixed now. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > Thanks, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > Jun > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > On Fri, Feb 3, 2017 at 6:19 PM, Dong > Lin < > > > > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > Hey ALexey, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Thanks for all the comments! > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > I have updated the KIP to specify how > we > > > > > enforce > > > > > > > > > quota. > > > > > > > > > > I > > > > > > > > > > > > also > > > > > > > > > > > > > >> > > updated > > > > > > > > > > > > > >> > > > > the > > > > > > > > > > > > > >> > > > > > "The thread model and broker logic for > > > > moving > > > > > > > > replica > > > > > > > > > > data > > > > > > > > > > > > > >> between > > > > > > > > > > > > > >> > > log > > > > > > > > > > > > > >> > > > > > directories" to make it easier to > read. > > > You > > > > > can > > > > > > > find > > > > > > > > > the > > > > > > > > > > > > exact > > > > > > > > > > > > > >> > change > > > > > > > > > > > > > >> > > > > here > > > > > > > > > > > > > >> > > > > > <https://cwiki.apache.org/conf > > > > > > > > > > > > luence/pages/diffpagesbyversio > > > > > > > > > > > > > >> > > > > > n.action?pageId=67638408&selec > > > > > > > > > > > > tedPageVersions=5&selectedPage > > > > > > > > > > > > > >> > > > Versions=6>. > > > > > > > > > > > > > >> > > > > > The idea is to use the same > replication > > > > quota > > > > > > > > > mechanism > > > > > > > > > > > > > >> introduced > > > > > > > > > > > > > >> > in > > > > > > > > > > > > > >> > > > > > KIP-73. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > Thanks, > > > > > > > > > > > > > >> > > > > > Dong > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > On Wed, Feb 1, 2017 at 2:16 AM, Alexey > > > > > > Ozeritsky < > > > > > > > > > > > > > >> > > aozerit...@yandex.ru > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > 24.01.2017, 22:03, "Dong Lin" < > > > > > > > > lindon...@gmail.com > > > > > > > > > >: > > > > > > > > > > > > > >> > > > > > > > Hey Alexey, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks. I think we agreed that the > > > > > suggested > > > > > > > > > > solution > > > > > > > > > > > > > >> doesn't > > > > > > > > > > > > > >> > > work > > > > > > > > > > > > > >> > > > in > > > > > > > > > > > > > >> > > > > > > > general for kafka users. To answer > > > your > > > > > > > > questions: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > 1. I agree we need quota to rate > > limit > > > > > > replica > > > > > > > > > > > movement > > > > > > > > > > > > > >> when a > > > > > > > > > > > > > >> > > > broker > > > > > > > > > > > > > >> > > > > > is > > > > > > > > > > > > > >> > > > > > > > moving a "leader" replica. I will > > come > > > > up > > > > > > with > > > > > > > > > > > solution, > > > > > > > > > > > > > >> > probably > > > > > > > > > > > > > >> > > > > > re-use > > > > > > > > > > > > > >> > > > > > > > the config of replication quota > > > > introduced > > > > > > in > > > > > > > > > > KIP-73. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > 2. Good point. I agree that this > is > > a > > > > > > problem > > > > > > > in > > > > > > > > > > > > general. > > > > > > > > > > > > > >> If is > > > > > > > > > > > > > >> > > no > > > > > > > > > > > > > >> > > > > new > > > > > > > > > > > > > >> > > > > > > data > > > > > > > > > > > > > >> > > > > > > > on that broker, with current > default > > > > value > > > > > > of > > > > > > > > > > > > > >> > > > > > replica.fetch.wait.max.ms > > > > > > > > > > > > > >> > > > > > > > and replica.fetch.max.bytes, the > > > replica > > > > > > will > > > > > > > be > > > > > > > > > > moved > > > > > > > > > > > > at > > > > > > > > > > > > > >> only > > > > > > > > > > > > > >> > 2 > > > > > > > > > > > > > >> > > > MBps > > > > > > > > > > > > > >> > > > > > > > throughput. I think the solution > is > > > for > > > > > > broker > > > > > > > > to > > > > > > > > > > set > > > > > > > > > > > > > >> > > > > > > > replica.fetch.wait.max.ms to 0 in > > its > > > > > > > > > FetchRequest > > > > > > > > > > if > > > > > > > > > > > > the > > > > > > > > > > > > > >> > > > > > corresponding > > > > > > > > > > > > > >> > > > > > > > ReplicaFetcherThread needs to move > > > some > > > > > > > replica > > > > > > > > to > > > > > > > > > > > > another > > > > > > > > > > > > > >> > disk. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > 3. I have updated the KIP to > mention > > > > that > > > > > > the > > > > > > > > read > > > > > > > > > > > size > > > > > > > > > > > > > of a > > > > > > > > > > > > > >> > > given > > > > > > > > > > > > > >> > > > > > > > partition is configured using > > > > > > > > > > replica.fetch.max.bytes > > > > > > > > > > > > when > > > > > > > > > > > > > >> we > > > > > > > > > > > > > >> > > move > > > > > > > > > > > > > >> > > > > > > replicas > > > > > > > > > > > > > >> > > > > > > > between disks. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Please see this > > > > > > > > > > > > > >> > > > > > > > <https://cwiki.apache.org/conf > > > > > > > > > > > > > >> luence/pages/diffpagesbyversio > > > > > > > > > > > > > >> > > > n.action > > > > > > > > > > > > > >> > > > > ? > > > > > > > > > > > > > >> > > > > > > pageId=67638408&selectedPageVe > > > > > > > > > > > > > rsions=4&selectedPageVersions= > > > > > > > > > > > > > >> 5> > > > > > > > > > > > > > >> > > > > > > > for the change of the KIP. I will > > come > > > > up > > > > > > > with a > > > > > > > > > > > > solution > > > > > > > > > > > > > to > > > > > > > > > > > > > >> > > > throttle > > > > > > > > > > > > > >> > > > > > > > replica movement when a broker is > > > > moving a > > > > > > > > > "leader" > > > > > > > > > > > > > replica. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > Thanks. It looks great. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > On Tue, Jan 24, 2017 at 3:30 AM, > > > Alexey > > > > > > > > Ozeritsky > > > > > > > > > < > > > > > > > > > > > > > >> > > > > > aozerit...@yandex.ru> > > > > > > > > > > > > > >> > > > > > > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > >> 23.01.2017, 22:11, "Dong Lin" < > > > > > > > > > > lindon...@gmail.com > > > > > > > > > > > >: > > > > > > > > > > > > > >> > > > > > > >> > Thanks. Please see my comment > > > > inline. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > On Mon, Jan 23, 2017 at 6:45 > AM, > > > > > Alexey > > > > > > > > > > Ozeritsky > > > > > > > > > > > < > > > > > > > > > > > > > >> > > > > > > aozerit...@yandex.ru> > > > > > > > > > > > > > >> > > > > > > >> > wrote: > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> >> 13.01.2017, 22:29, "Dong > Lin" < > > > > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > >: > > > > > > > > > > > > > >> > > > > > > >> >> > Hey Alexey, > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > >> > > > > > > >> >> > Thanks for your review and > > the > > > > > > > > alternative > > > > > > > > > > > > > approach. > > > > > > > > > > > > > >> > Here > > > > > > > > > > > > > >> > > is > > > > > > > > > > > > > >> > > > > my > > > > > > > > > > > > > >> > > > > > > >> >> > understanding of your > patch. > > > > > kafka's > > > > > > > > > > background > > > > > > > > > > > > > >> threads > > > > > > > > > > > > > >> > > are > > > > > > > > > > > > > >> > > > > used > > > > > > > > > > > > > >> > > > > > > to > > > > > > > > > > > > > >> > > > > > > >> move > > > > > > > > > > > > > >> > > > > > > >> >> > data between replicas. When > > > data > > > > > > > movement > > > > > > > > > is > > > > > > > > > > > > > >> triggered, > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > > log > > > > > > > > > > > > > >> > > > > > > will > > > > > > > > > > > > > >> > > > > > > >> be > > > > > > > > > > > > > >> > > > > > > >> >> > rolled and the new logs > will > > be > > > > put > > > > > > in > > > > > > > > the > > > > > > > > > > new > > > > > > > > > > > > > >> > directory, > > > > > > > > > > > > > >> > > > and > > > > > > > > > > > > > >> > > > > > > >> background > > > > > > > > > > > > > >> > > > > > > >> >> > threads will move segment > > from > > > > old > > > > > > > > > directory > > > > > > > > > > to > > > > > > > > > > > > new > > > > > > > > > > > > > >> > > > directory. > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > >> > > > > > > >> >> > It is important to note > that > > > > > KIP-112 > > > > > > is > > > > > > > > > > > intended > > > > > > > > > > > > to > > > > > > > > > > > > > >> work > > > > > > > > > > > > > >> > > > with > > > > > > > > > > > > > >> > > > > > > >> KIP-113 to > > > > > > > > > > > > > >> > > > > > > >> >> > support JBOD. I think your > > > > solution > > > > > > is > > > > > > > > > > > definitely > > > > > > > > > > > > > >> > simpler > > > > > > > > > > > > > >> > > > and > > > > > > > > > > > > > >> > > > > > > better > > > > > > > > > > > > > >> > > > > > > >> >> under > > > > > > > > > > > > > >> > > > > > > >> >> > the current kafka > > > implementation > > > > > > that a > > > > > > > > > > broker > > > > > > > > > > > > will > > > > > > > > > > > > > >> fail > > > > > > > > > > > > > >> > > if > > > > > > > > > > > > > >> > > > > any > > > > > > > > > > > > > >> > > > > > > disk > > > > > > > > > > > > > >> > > > > > > >> >> fails. > > > > > > > > > > > > > >> > > > > > > >> >> > But I am not sure if we > want > > to > > > > > allow > > > > > > > > > broker > > > > > > > > > > to > > > > > > > > > > > > run > > > > > > > > > > > > > >> with > > > > > > > > > > > > > >> > > > > partial > > > > > > > > > > > > > >> > > > > > > >> disks > > > > > > > > > > > > > >> > > > > > > >> >> > failure. Let's say the a > > > replica > > > > is > > > > > > > being > > > > > > > > > > moved > > > > > > > > > > > > > from > > > > > > > > > > > > > >> > > > > log_dir_old > > > > > > > > > > > > > >> > > > > > > to > > > > > > > > > > > > > >> > > > > > > >> >> > log_dir_new and then > > > log_dir_old > > > > > > stops > > > > > > > > > > working > > > > > > > > > > > > due > > > > > > > > > > > > > to > > > > > > > > > > > > > >> > disk > > > > > > > > > > > > > >> > > > > > > failure. > > > > > > > > > > > > > >> > > > > > > >> How > > > > > > > > > > > > > >> > > > > > > >> >> > would your existing patch > > > handles > > > > > it? > > > > > > > To > > > > > > > > > make > > > > > > > > > > > the > > > > > > > > > > > > > >> > > scenario a > > > > > > > > > > > > > >> > > > > bit > > > > > > > > > > > > > >> > > > > > > more > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > >> > > > > > > >> >> We will lose log_dir_old. > After > > > > > broker > > > > > > > > > restart > > > > > > > > > > we > > > > > > > > > > > > can > > > > > > > > > > > > > >> read > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > > > data > > > > > > > > > > > > > >> > > > > > > >> from > > > > > > > > > > > > > >> > > > > > > >> >> log_dir_new. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > No, you probably can't. This > is > > > > > because > > > > > > > the > > > > > > > > > > broker > > > > > > > > > > > > > >> doesn't > > > > > > > > > > > > > >> > > have > > > > > > > > > > > > > >> > > > > > > *all* the > > > > > > > > > > > > > >> > > > > > > >> > data for this partition. For > > > > example, > > > > > > say > > > > > > > > the > > > > > > > > > > > broker > > > > > > > > > > > > > has > > > > > > > > > > > > > >> > > > > > > >> > partition_segement_1, > > > > > > partition_segment_50 > > > > > > > > and > > > > > > > > > > > > > >> > > > > > partition_segment_100 > > > > > > > > > > > > > >> > > > > > > on > > > > > > > > > > > > > >> > > > > > > >> the > > > > > > > > > > > > > >> > > > > > > >> > log_dir_old. > > > partition_segment_100, > > > > > > which > > > > > > > > has > > > > > > > > > > the > > > > > > > > > > > > > latest > > > > > > > > > > > > > >> > > data, > > > > > > > > > > > > > >> > > > > has > > > > > > > > > > > > > >> > > > > > > been > > > > > > > > > > > > > >> > > > > > > >> > moved to log_dir_new, and the > > > > > > log_dir_old > > > > > > > > > fails > > > > > > > > > > > > before > > > > > > > > > > > > > >> > > > > > > >> partition_segment_50 > > > > > > > > > > > > > >> > > > > > > >> > and partition_segment_1 is > moved > > > to > > > > > > > > > log_dir_new. > > > > > > > > > > > > When > > > > > > > > > > > > > >> > broker > > > > > > > > > > > > > >> > > > > > > re-starts, > > > > > > > > > > > > > >> > > > > > > >> it > > > > > > > > > > > > > >> > > > > > > >> > won't have > partition_segment_50. > > > > This > > > > > > > causes > > > > > > > > > > > problem > > > > > > > > > > > > > if > > > > > > > > > > > > > >> > > broker > > > > > > > > > > > > > >> > > > is > > > > > > > > > > > > > >> > > > > > > elected > > > > > > > > > > > > > >> > > > > > > >> > leader and consumer wants to > > > consume > > > > > > data > > > > > > > in > > > > > > > > > the > > > > > > > > > > > > > >> > > > > > partition_segment_1. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> Right. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> >> > complicated, let's say the > > > broker > > > > > is > > > > > > > > > > shtudown, > > > > > > > > > > > > > >> > > log_dir_old's > > > > > > > > > > > > > >> > > > > > disk > > > > > > > > > > > > > >> > > > > > > >> fails, > > > > > > > > > > > > > >> > > > > > > >> >> > and the broker starts. In > > this > > > > case > > > > > > > > broker > > > > > > > > > > > > doesn't > > > > > > > > > > > > > >> even > > > > > > > > > > > > > >> > > know > > > > > > > > > > > > > >> > > > > if > > > > > > > > > > > > > >> > > > > > > >> >> log_dir_new > > > > > > > > > > > > > >> > > > > > > >> >> > has all the data needed for > > > this > > > > > > > replica. > > > > > > > > > It > > > > > > > > > > > > > becomes > > > > > > > > > > > > > >> a > > > > > > > > > > > > > >> > > > problem > > > > > > > > > > > > > >> > > > > > if > > > > > > > > > > > > > >> > > > > > > the > > > > > > > > > > > > > >> > > > > > > >> >> > broker is elected leader of > > > this > > > > > > > > partition > > > > > > > > > in > > > > > > > > > > > > this > > > > > > > > > > > > > >> case. > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > >> > > > > > > >> >> log_dir_new contains the most > > > > recent > > > > > > data > > > > > > > > so > > > > > > > > > we > > > > > > > > > > > > will > > > > > > > > > > > > > >> lose > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > > tail > > > > > > > > > > > > > >> > > > > > > of > > > > > > > > > > > > > >> > > > > > > >> >> partition. > > > > > > > > > > > > > >> > > > > > > >> >> This is not a big problem for > > us > > > > > > because > > > > > > > we > > > > > > > > > > > already > > > > > > > > > > > > > >> delete > > > > > > > > > > > > > >> > > > tails > > > > > > > > > > > > > >> > > > > > by > > > > > > > > > > > > > >> > > > > > > >> hand > > > > > > > > > > > > > >> > > > > > > >> >> (see > > > > https://issues.apache.org/jira > > > > > > > > > > > > > /browse/KAFKA-1712 > > > > > > > > > > > > > >> ). > > > > > > > > > > > > > >> > > > > > > >> >> Also we dont use authomatic > > > leader > > > > > > > > balancing > > > > > > > > > > > > > >> > > > > > > >> (auto.leader.rebalance.enable= > > > false), > > > > > > > > > > > > > >> > > > > > > >> >> so this partition becomes the > > > > leader > > > > > > > with a > > > > > > > > > low > > > > > > > > > > > > > >> > probability. > > > > > > > > > > > > > >> > > > > > > >> >> I think my patch can be > > modified > > > to > > > > > > > > prohibit > > > > > > > > > > the > > > > > > > > > > > > > >> selection > > > > > > > > > > > > > >> > > of > > > > > > > > > > > > > >> > > > > the > > > > > > > > > > > > > >> > > > > > > >> leader > > > > > > > > > > > > > >> > > > > > > >> >> until the partition does not > > move > > > > > > > > completely. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > I guess you are saying that > you > > > have > > > > > > > deleted > > > > > > > > > the > > > > > > > > > > > > tails > > > > > > > > > > > > > >> by > > > > > > > > > > > > > >> > > hand > > > > > > > > > > > > > >> > > > in > > > > > > > > > > > > > >> > > > > > > your > > > > > > > > > > > > > >> > > > > > > >> own > > > > > > > > > > > > > >> > > > > > > >> > kafka branch. But KAFKA-1712 > is > > > not > > > > > > > accepted > > > > > > > > > > into > > > > > > > > > > > > > Kafka > > > > > > > > > > > > > >> > trunk > > > > > > > > > > > > > >> > > > > and I > > > > > > > > > > > > > >> > > > > > > am > > > > > > > > > > > > > >> > > > > > > >> not > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> No. We just modify segments > mtime > > by > > > > > cron > > > > > > > job. > > > > > > > > > > This > > > > > > > > > > > > > works > > > > > > > > > > > > > >> > with > > > > > > > > > > > > > >> > > > > > vanilla > > > > > > > > > > > > > >> > > > > > > >> kafka. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > sure if it is the right > > solution. > > > > How > > > > > > > would > > > > > > > > > this > > > > > > > > > > > > > >> solution > > > > > > > > > > > > > >> > > > address > > > > > > > > > > > > > >> > > > > > the > > > > > > > > > > > > > >> > > > > > > >> > problem mentioned above? > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> If you need only fresh data and > if > > > you > > > > > > > remove > > > > > > > > > old > > > > > > > > > > > data > > > > > > > > > > > > > by > > > > > > > > > > > > > >> > hands > > > > > > > > > > > > > >> > > > > this > > > > > > > > > > > > > >> > > > > > is > > > > > > > > > > > > > >> > > > > > > >> not a problem. But in general > case > > > > > > > > > > > > > >> > > > > > > >> this is a problem of course. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > BTW, I am not sure the > solution > > > > > > mentioned > > > > > > > in > > > > > > > > > > > > > KAFKA-1712 > > > > > > > > > > > > > >> is > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > >> > > > > > right > > > > > > > > > > > > > >> > > > > > > way > > > > > > > > > > > > > >> > > > > > > >> to > > > > > > > > > > > > > >> > > > > > > >> > address its problem. Now that > we > > > > have > > > > > > > > > timestamp > > > > > > > > > > in > > > > > > > > > > > > the > > > > > > > > > > > > > >> > > message > > > > > > > > > > > > > >> > > > we > > > > > > > > > > > > > >> > > > > > > can use > > > > > > > > > > > > > >> > > > > > > >> > that to delete old segement > > > instead > > > > of > > > > > > > > relying > > > > > > > > > > on > > > > > > > > > > > > the > > > > > > > > > > > > > >> log > > > > > > > > > > > > > >> > > > segment > > > > > > > > > > > > > >> > > > > > > mtime. > > > > > > > > > > > > > >> > > > > > > >> > Just some idea and we don't > have > > > to > > > > > > > discuss > > > > > > > > > this > > > > > > > > > > > > > problem > > > > > > > > > > > > > >> > > here. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > >> > > > > > > >> >> > The solution presented in > the > > > KIP > > > > > > > > attempts > > > > > > > > > to > > > > > > > > > > > > > handle > > > > > > > > > > > > > >> it > > > > > > > > > > > > > >> > by > > > > > > > > > > > > > >> > > > > > > replacing > > > > > > > > > > > > > >> > > > > > > >> >> > replica in an atomic > version > > > > > fashion > > > > > > > > after > > > > > > > > > > the > > > > > > > > > > > > log > > > > > > > > > > > > > in > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > >> > > > new > > > > > > > > > > > > > >> > > > > > dir > > > > > > > > > > > > > >> > > > > > > has > > > > > > > > > > > > > >> > > > > > > >> >> fully > > > > > > > > > > > > > >> > > > > > > >> >> > caught up with the log in > the > > > old > > > > > > dir. > > > > > > > At > > > > > > > > > at > > > > > > > > > > > time > > > > > > > > > > > > > the > > > > > > > > > > > > > >> > log > > > > > > > > > > > > > >> > > > can > > > > > > > > > > > > > >> > > > > be > > > > > > > > > > > > > >> > > > > > > >> >> considered > > > > > > > > > > > > > >> > > > > > > >> >> > to exist on only one log > > > > directory. > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > >> > > > > > > >> >> As I understand your solution > > > does > > > > > not > > > > > > > > cover > > > > > > > > > > > > quotas. > > > > > > > > > > > > > >> > > > > > > >> >> What happens if someone > starts > > to > > > > > > > transfer > > > > > > > > > 100 > > > > > > > > > > > > > >> partitions > > > > > > > > > > > > > >> > ? > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > Good point. Quota can be > > > implemented > > > > > in > > > > > > > the > > > > > > > > > > > future. > > > > > > > > > > > > It > > > > > > > > > > > > > >> is > > > > > > > > > > > > > >> > > > > currently > > > > > > > > > > > > > >> > > > > > > >> > mentioned as as a potential > > future > > > > > > > > improvement > > > > > > > > > > in > > > > > > > > > > > > > >> KIP-112 > > > > > > > > > > > > > >> > > > > > > >> > < > https://cwiki.apache.org/conf > > > > > > > > > > > > > luence/display/KAFKA/KIP- > > > > > > > > > > > > > >> > 112%3 > > > > > > > > > > > > > >> > > > > > > >> A+Handle+disk+failure+for+ > > > > JBOD>.Thanks > > > > > > > > > > > > > >> > > > > > > >> > for the reminder. I will move > it > > > to > > > > > > > KIP-113. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> >> > If yes, it will read a > > > > > > > > ByteBufferMessageSet > > > > > > > > > > > from > > > > > > > > > > > > > >> > > > > > > topicPartition.log > > > > > > > > > > > > > >> > > > > > > >> and > > > > > > > > > > > > > >> > > > > > > >> >> append the message set to > > > > > > > > topicPartition.move > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > >> > > > > > > >> >> i.e. processPartitionData > will > > > read > > > > > > data > > > > > > > > from > > > > > > > > > > the > > > > > > > > > > > > > >> > beginning > > > > > > > > > > > > > >> > > of > > > > > > > > > > > > > >> > > > > > > >> >> topicPartition.log? What is > the > > > > read > > > > > > > size? > > > > > > > > > > > > > >> > > > > > > >> >> A ReplicaFetchThread reads > many > > > > > > > partitions > > > > > > > > so > > > > > > > > > > if > > > > > > > > > > > > one > > > > > > > > > > > > > >> does > > > > > > > > > > > > > >> > > some > > > > > > > > > > > > > >> > > > > > > >> complicated > > > > > > > > > > > > > >> > > > > > > >> >> work (= read a lot of data > from > > > > disk) > > > > > > > > > > everything > > > > > > > > > > > > will > > > > > > > > > > > > > >> slow > > > > > > > > > > > > > >> > > > down. > > > > > > > > > > > > > >> > > > > > > >> >> I think read size should not > be > > > > very > > > > > > big. > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > >> > > > > > > >> >> On the other hand at this > point > > > > > > > > > > > > > (processPartitionData) > > > > > > > > > > > > > >> one > > > > > > > > > > > > > >> > > can > > > > > > > > > > > > > >> > > > > use > > > > > > > > > > > > > >> > > > > > > only > > > > > > > > > > > > > >> > > > > > > >> >> the new data > > > (ByteBufferMessageSet > > > > > from > > > > > > > > > > > parameters) > > > > > > > > > > > > > and > > > > > > > > > > > > > >> > wait > > > > > > > > > > > > > >> > > > > until > > > > > > > > > > > > > >> > > > > > > >> >> (topicPartition.move. > > > > smallestOffset > > > > > <= > > > > > > > > > > > > > >> > > > > > > topicPartition.log.smallestOff > > > > > > > > > > > > > >> > > > > > > >> set > > > > > > > > > > > > > >> > > > > > > >> >> && topicPartition.log. > > > > largestOffset > > > > > == > > > > > > > > > > > > > >> > > > > > > topicPartition.log.largestOffs > > > > > > > > > > > > > >> > > > > > > >> et). > > > > > > > > > > > > > >> > > > > > > >> >> In this case the write speed > to > > > > > > > > > > > topicPartition.move > > > > > > > > > > > > > and > > > > > > > > > > > > > >> > > > > > > >> topicPartition.log > > > > > > > > > > > > > >> > > > > > > >> >> will be the same so this will > > > allow > > > > > us > > > > > > to > > > > > > > > > move > > > > > > > > > > > many > > > > > > > > > > > > > >> > > partitions > > > > > > > > > > > > > >> > > > > to > > > > > > > > > > > > > >> > > > > > > one > > > > > > > > > > > > > >> > > > > > > >> disk. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > The read size of a given > > partition > > > > is > > > > > > > > > configured > > > > > > > > > > > > > >> > > > > > > >> > using replica.fetch.max.bytes, > > > which > > > > > is > > > > > > > the > > > > > > > > > same > > > > > > > > > > > > size > > > > > > > > > > > > > >> used > > > > > > > > > > > > > >> > by > > > > > > > > > > > > > >> > > > > > > >> FetchRequest > > > > > > > > > > > > > >> > > > > > > >> > from follower to leader. If > the > > > > broker > > > > > > is > > > > > > > > > > moving a > > > > > > > > > > > > > >> replica > > > > > > > > > > > > > >> > > for > > > > > > > > > > > > > >> > > > > > which > > > > > > > > > > > > > >> > > > > > > it > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> OK. Could you mention it in KIP? > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > acts as a follower, the disk > > write > > > > > rate > > > > > > > for > > > > > > > > > > moving > > > > > > > > > > > > > this > > > > > > > > > > > > > >> > > replica > > > > > > > > > > > > > >> > > > > is > > > > > > > > > > > > > >> > > > > > at > > > > > > > > > > > > > >> > > > > > > >> most > > > > > > > > > > > > > >> > > > > > > >> > the rate it fetches from > leader > > > > > (assume > > > > > > it > > > > > > > > is > > > > > > > > > > > > catching > > > > > > > > > > > > > >> up > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > >> > > > has > > > > > > > > > > > > > >> > > > > > > >> > sufficient data to read from > > > leader, > > > > > > which > > > > > > > > is > > > > > > > > > > > > subject > > > > > > > > > > > > > to > > > > > > > > > > > > > >> > > > > > > round-trip-time > > > > > > > > > > > > > >> > > > > > > >> > between itself and the leader. > > > Thus > > > > > this > > > > > > > > part > > > > > > > > > if > > > > > > > > > > > > > >> probably > > > > > > > > > > > > > >> > > fine > > > > > > > > > > > > > >> > > > > even > > > > > > > > > > > > > >> > > > > > > >> without > > > > > > > > > > > > > >> > > > > > > >> > quota. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> I think there are 2 problems > > > > > > > > > > > > > >> > > > > > > >> 1. Without speed limiter this > will > > > not > > > > > > work > > > > > > > > good > > > > > > > > > > > even > > > > > > > > > > > > > for > > > > > > > > > > > > > >> 1 > > > > > > > > > > > > > >> > > > > > partition. > > > > > > > > > > > > > >> > > > > > > In > > > > > > > > > > > > > >> > > > > > > >> our production we had a problem > so > > > we > > > > > did > > > > > > > the > > > > > > > > > > > throuput > > > > > > > > > > > > > >> > limiter: > > > > > > > > > > > > > >> > > > > > > >> https://github.com/resetius/ka > > > > > > > > > > > > > >> fka/commit/cda31dadb2f135743bf > > > > > > > > > > > > > >> > > > > > > >> 41083062927886c5ddce1#diff-ffa > > > > > > > > > > > > > >> 8861e850121997a534ebdde2929c6R > > > > > > > > > > > > > >> > > 713 > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> 2. I dont understand how it will > > > work > > > > in > > > > > > > case > > > > > > > > of > > > > > > > > > > big > > > > > > > > > > > > > >> > > > > > > >> replica.fetch.wait.max.ms and > > > > partition > > > > > > > with > > > > > > > > > > > > irregular > > > > > > > > > > > > > >> flow. > > > > > > > > > > > > > >> > > > > > > >> For example someone could have > > > > > > > > > > > > > replica.fetch.wait.max.ms > > > > > > > > > > > > > >> > =10mi > > > > > > > > > > > > > >> > > > nutes > > > > > > > > > > > > > >> > > > > > and > > > > > > > > > > > > > >> > > > > > > >> partition that has very high > data > > > flow > > > > > > from > > > > > > > > > 12:00 > > > > > > > > > > to > > > > > > > > > > > > > 13:00 > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > >> > > > zero > > > > > > > > > > > > > >> > > > > > > flow > > > > > > > > > > > > > >> > > > > > > >> otherwise. > > > > > > > > > > > > > >> > > > > > > >> In this case > processPartitionData > > > > could > > > > > be > > > > > > > > > called > > > > > > > > > > > once > > > > > > > > > > > > > per > > > > > > > > > > > > > >> > > > > 10minutes > > > > > > > > > > > > > >> > > > > > > so if > > > > > > > > > > > > > >> > > > > > > >> we start data moving in 13:01 it > > > will > > > > be > > > > > > > > > finished > > > > > > > > > > > next > > > > > > > > > > > > > >> day. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > But ff the broker is moving a > > > > replica > > > > > > for > > > > > > > > > which > > > > > > > > > > it > > > > > > > > > > > > > acts > > > > > > > > > > > > > >> as > > > > > > > > > > > > > >> > a > > > > > > > > > > > > > >> > > > > > leader, > > > > > > > > > > > > > >> > > > > > > as > > > > > > > > > > > > > >> > > > > > > >> of > > > > > > > > > > > > > >> > > > > > > >> > current KIP the broker will > keep > > > > > reading > > > > > > > > from > > > > > > > > > > > > > >> log_dir_old > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > >> > > > > > append > > > > > > > > > > > > > >> > > > > > > to > > > > > > > > > > > > > >> > > > > > > >> > log_dir_new without having to > > wait > > > > for > > > > > > > > > > > > > round-trip-time. > > > > > > > > > > > > > >> We > > > > > > > > > > > > > >> > > > > probably > > > > > > > > > > > > > >> > > > > > > need > > > > > > > > > > > > > >> > > > > > > >> > quota for this in the future. > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > >> > > > > > > >> >> > And to answer your > question, > > > yes > > > > > > > > > > > > topicpartition.log > > > > > > > > > > > > > >> > refers > > > > > > > > > > > > > >> > > > to > > > > > > > > > > > > > >> > > > > > > >> >> > > topic-paritition/segment.log. > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > >> > > > > > > >> >> > Thanks, > > > > > > > > > > > > > >> > > > > > > >> >> > Dong > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > >> > > > > > > >> >> > On Fri, Jan 13, 2017 at > 4:12 > > > AM, > > > > > > Alexey > > > > > > > > > > > > Ozeritsky < > > > > > > > > > > > > > >> > > > > > > >> aozerit...@yandex.ru> > > > > > > > > > > > > > >> > > > > > > >> >> > wrote: > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > >> > > > > > > >> >> >> Hi, > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > > > > > > > > > >> > > > > > > >> >> >> We have the similar > solution > > > > that > > > > > > have > > > > > > > > > been > > > > > > > > > > > > > working > > > > > > > > > > > > > >> in > > > > > > > > > > > > > >> > > > > > production > > > > > > > > > > > > > >> > > > > > > >> since > > > > > > > > > > > > > >> > > > > > > >> >> >> 2014. You can see it here: > > > > > > > > > > > > > >> > > https://github.com/resetius/ka > > > > > > > > > > > > > >> > > > > > > >> >> >> > > fka/commit/20658593e246d218490 > > > > > > > > > > > > > 6879defa2e763c4d413fb > > > > > > > > > > > > > >> > > > > > > >> >> >> The idea is very simple > > > > > > > > > > > > > >> > > > > > > >> >> >> 1. Disk balancer runs in a > > > > > separate > > > > > > > > thread > > > > > > > > > > > > inside > > > > > > > > > > > > > >> > > scheduler > > > > > > > > > > > > > >> > > > > > pool. > > > > > > > > > > > > > >> > > > > > > >> >> >> 2. It does not touch empty > > > > > > partitions > > > > > > > > > > > > > >> > > > > > > >> >> >> 3. Before it moves a > > partition > > > > it > > > > > > > > forcibly > > > > > > > > > > > > creates > > > > > > > > > > > > > >> new > > > > > > > > > > > > > >> > > > > segment > > > > > > > > > > > > > >> > > > > > > on a > > > > > > > > > > > > > >> > > > > > > >> >> >> destination disk > > > > > > > > > > > > > >> > > > > > > >> >> >> 4. It moves segment by > > segment > > > > > from > > > > > > > new > > > > > > > > to > > > > > > > > > > > old. > > > > > > > > > > > > > >> > > > > > > >> >> >> 5. Log class works with > > > segments > > > > > on > > > > > > > both > > > > > > > > > > disks > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > > > > > > > > > >> > > > > > > >> >> >> Your approach seems too > > > > > complicated, > > > > > > > > > > moreover > > > > > > > > > > > it > > > > > > > > > > > > > >> means > > > > > > > > > > > > > >> > > that > > > > > > > > > > > > > >> > > > > you > > > > > > > > > > > > > >> > > > > > > >> have to > > > > > > > > > > > > > >> > > > > > > >> >> >> patch different components > > of > > > > the > > > > > > > system > > > > > > > > > > > > > >> > > > > > > >> >> >> Could you clarify what do > > you > > > > mean > > > > > > by > > > > > > > > > > > > > >> > topicPartition.log? > > > > > > > > > > > > > >> > > > Is > > > > > > > > > > > > > >> > > > > it > > > > > > > > > > > > > >> > > > > > > >> >> >> > > topic-paritition/segment.log ? > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > > > > > > > > > >> > > > > > > >> >> >> 12.01.2017, 21:47, "Dong > > Lin" > > > < > > > > > > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > >> >: > > > > > > > > > > > > > >> > > > > > > >> >> >> > Hi all, > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > > > > > > > > > > >> > > > > > > >> >> >> > We created KIP-113: > > Support > > > > > > replicas > > > > > > > > > > > movement > > > > > > > > > > > > > >> between > > > > > > > > > > > > > >> > > log > > > > > > > > > > > > > >> > > > > > > >> >> directories. > > > > > > > > > > > > > >> > > > > > > >> >> >> > Please find the KIP wiki > > in > > > > the > > > > > > link > > > > > > > > > > > > > >> > > > > > > >> >> >> > * > > > > https://cwiki.apache.org/conf > > > > > > > > > > > > > >> > > > > luence/display/KAFKA/KIP-113% > > > > > > > > > > > > > >> > > > > > > >> >> >> > > 3A+Support+replicas+movement+b > > > > > > > > > > > > > >> etween+log+directories > > > > > > > > > > > > > >> > > > > > > >> >> >> > < > > > > https://cwiki.apache.org/conf > > > > > > > > > > > > > >> > > > > luence/display/KAFKA/KIP-113% > > > > > > > > > > > > > >> > > > > > > >> >> >> > > 3A+Support+replicas+movement+ > > > > > > > > > > > > > >> > between+log+directories>.* > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > > > > > > > > > > >> > > > > > > >> >> >> > This KIP is related to > > > KIP-112 > > > > > > > > > > > > > >> > > > > > > >> >> >> > < > > > > https://cwiki.apache.org/conf > > > > > > > > > > > > > >> > > > > luence/display/KAFKA/KIP-112% > > > > > > > > > > > > > >> > > > > > > >> >> >> > 3A+Handle+disk+failure+for+ > > > > JBOD>: > > > > > > > > > > > > > >> > > > > > > >> >> >> > Handle disk failure for > > > JBOD. > > > > > They > > > > > > > are > > > > > > > > > > > needed > > > > > > > > > > > > in > > > > > > > > > > > > > >> > order > > > > > > > > > > > > > >> > > to > > > > > > > > > > > > > >> > > > > > > support > > > > > > > > > > > > > >> > > > > > > >> >> JBOD in > > > > > > > > > > > > > >> > > > > > > >> >> >> > Kafka. Please help > review > > > the > > > > > KIP. > > > > > > > You > > > > > > > > > > > > feedback > > > > > > > > > > > > > is > > > > > > > > > > > > > >> > > > > > appreciated! > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > > > > > > > > > > >> > > > > > > >> >> >> > Thanks, > > > > > > > > > > > > > >> > > > > > > >> >> >> > Dong > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >