Thanks Jun! Hi all,
Thanks for all the comments. I am going to open the voting thread if there is no further concern with the KIP. Dong On Thu, Mar 30, 2017 at 3:19 PM, Jun Rao <j...@confluent.io> wrote: > Hi, Dong, > > I don't have further concerns. If there are no more comments from other > people, we can start the vote. > > Thanks, > > Jun > > On Thu, Mar 30, 2017 at 10:59 AM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Jun, > > > > Thanks much for the comment! Do you think we start vote for KIP-112 and > > KIP-113 if there is no further concern? > > > > Dong > > > > On Thu, Mar 30, 2017 at 10:40 AM, Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Dong, > > > > > > Ok, so it seems that in solution (2), if the tool exits successfully, > > then > > > we know for sure that all replicas will be in the right log dirs. > > Solution > > > (1) doesn't guarantee that. That seems better and we can go with your > > > current solution then. > > > > > > Thanks, > > > > > > Jun > > > > > > On Fri, Mar 24, 2017 at 4:28 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > > > Hey Jun, > > > > > > > > No.. the current approach describe in the KIP (see here > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-113% > > > > 3A+Support+replicas+movement+between+log+directories#KIP- > > > > 113:Supportreplicasmovementbetweenlogdirectories-2)Howtoreas > > > > signreplicabetweenlogdirectoriesacrossbrokers>) > > > > also sends ChangeReplicaDirRequest before writing reassignment path > in > > > ZK. > > > > I think we discussing whether ChangeReplicaDirResponse (1) shows > > success > > > or > > > > (2) should specify ReplicaNotAvailableException, if replica has not > > been > > > > created yet. > > > > > > > > Since both solution will send ChangeReplicaDirRequest before writing > > > > reassignment in ZK, their chance of creating replica in the right > > > directory > > > > is the same. > > > > > > > > To take care of the rarer case that some brokers go down immediately > > > after > > > > the reassignment tool is run, solution (1) requires reassignment tool > > to > > > > repeatedly send DescribeDirRequest and ChangeReplicaDirRequest, while > > > > solution (1) requires tool to only retry ChangeReplicaDirRequest if > the > > > > response says ReplicaNotAvailableException. It seems that solution > (2) > > is > > > > cleaner because ChangeReplicaDirRequest won't depend on > > > DescribeDirRequest. > > > > What do you think? > > > > > > > > Thanks, > > > > Dong > > > > > > > > > > > > On Fri, Mar 24, 2017 at 3:56 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > Hi, Dong, > > > > > > > > > > We are just comparing whether it's better for the reassignment tool > > to > > > > > send ChangeReplicaDirRequest > > > > > (1) before or (2) after writing the reassignment path in ZK . > > > > > > > > > > In the case when all brokers are alive when the reassignment tool > is > > > run, > > > > > (1) guarantees 100% that the new replicas will be in the right log > > dirs > > > > and > > > > > (2) can't. > > > > > > > > > > In the rarer case that some brokers go down immediately after the > > > > > reassignment tool is run, in either approach, there is a chance > when > > > the > > > > > failed broker comes back, it will complete the pending reassignment > > > > process > > > > > by putting some replicas in the wrong log dirs. > > > > > > > > > > Implementation wise, (1) and (2) seem to be the same. So, it seems > to > > > me > > > > > that (1) is better? > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Thu, Mar 23, 2017 at 11:54 PM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > Thanks much for the response! I agree with you that if multiple > > > > replicas > > > > > > are created in the wrong directory, we may waste resource if > either > > > > > > replicaMoveThread number is low or intra.broker.throttled.rate is > > > slow. > > > > > > Then the question is whether the suggested approach increases the > > > > chance > > > > > of > > > > > > replica being created in the correct log directory. > > > > > > > > > > > > I think the answer is no due to the argument provided in the > > previous > > > > > > email. Sending ChangeReplicaDirRequest before updating znode has > > > > > negligible > > > > > > impact on the chance that the broker processes > > > ChangeReplicaDirRequest > > > > > > before LeaderAndIsrRequest from controller. If we still worry > about > > > the > > > > > > order they are sent, the reassignment tool can first send > > > > > > ChangeReplicaDirRequest (so that broker remembers it in memory), > > > create > > > > > > reassignment znode, and then retry ChangeReplicaDirRequset if the > > > > > previous > > > > > > ChangeReplicaDirResponse says the replica has not been created. > > This > > > > > should > > > > > > give us the highest possible chance of creating replica in the > > > correct > > > > > > directory and avoid the problem of the suggested approach. I have > > > > updated > > > > > > "How > > > > > > to reassign replica between log directories across brokers" in > the > > > KIP > > > > to > > > > > > explain this procedure. > > > > > > > > > > > > To answer your question, the reassignment tool should fail with > > with > > > > > proper > > > > > > error message if user has specified log directory for a replica > on > > an > > > > > > offline broker. This is reasonable because reassignment tool can > > not > > > > > > guarantee that the replica will be moved to the specified log > > > directory > > > > > if > > > > > > the broker is offline. If all brokers are online, the > reassignment > > > tool > > > > > may > > > > > > hung up to 10 seconds (by default) to retry > ChangeReplicaDirRequest > > > if > > > > > any > > > > > > replica has not been created already. User can change this > timeout > > > > value > > > > > > using the newly-added --timeout argument of the reassignment > tool. > > > This > > > > > is > > > > > > specified in the Public Interface section in the KIP. The > > > reassignment > > > > > tool > > > > > > will only block if user uses this new feature of reassigning > > replica > > > > to a > > > > > > specific log directory in the broker. Therefore it seems backward > > > > > > compatible. > > > > > > > > > > > > Does this address the concern? > > > > > > > > > > > > Thanks, > > > > > > Dong > > > > > > > > > > > > On Thu, Mar 23, 2017 at 10:06 PM, Jun Rao <j...@confluent.io> > > wrote: > > > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > > > 11.2 I think there are a few reasons why the cross disk > movement > > > may > > > > > not > > > > > > > catch up if the replicas are created in the wrong log dirs to > > start > > > > > with. > > > > > > > (a) There could be more replica fetcher threads than the disk > > > > movement > > > > > > > threads. (b) intra.broker.throttled.rate may be configured > lower > > > than > > > > > the > > > > > > > replica throttle rate. That's why I think getting the replicas > > > > created > > > > > in > > > > > > > the right log dirs will be better. > > > > > > > > > > > > > > For the corner case issue that you mentioned, I am not sure if > > the > > > > > > approach > > > > > > > in the KIP completely avoids that. If a broker is down when the > > > > > partition > > > > > > > reassignment tool is started, does the tool just hang (keep > > > retrying > > > > > > > ChangeReplicaDirRequest) until the broker comes back? > Currently, > > > the > > > > > > > partition reassignment tool doesn't block. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 21, 2017 at 11:24 AM, Dong Lin < > lindon...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > > > Thanks for the explanation. Please see below my thoughts. > > > > > > > > > > > > > > > > 10. I see. So you are concerned with the potential > > implementation > > > > > > > > complexity which I wasn't aware of. I think it is OK not to > do > > > log > > > > > > > > cleaning on the .move log since there can be only one such > log > > in > > > > > each > > > > > > > > directory. I have updated the KIP to specify this: > > > > > > > > > > > > > > > > "The log segments in topicPartition.move directory will be > > > subject > > > > to > > > > > > log > > > > > > > > truncation, log retention in the same way as the log segments > > in > > > > the > > > > > > > source > > > > > > > > log directory. But we may not do log cleaning on the > > > > > > topicPartition.move > > > > > > > to > > > > > > > > simplify the implementation." > > > > > > > > > > > > > > > > 11.2 Now I get your point. I think we have slightly different > > > > > > expectation > > > > > > > > of the order in which the reassignment tools updates > > reassignment > > > > > node > > > > > > in > > > > > > > > ZK and sends ChangeReplicaDirRequest. > > > > > > > > > > > > > > > > I think the reassignment tool should first create > reassignment > > > > znode > > > > > > and > > > > > > > > then keep sending ChangeReplicaDirRequest until success. I > > think > > > > > > sending > > > > > > > > ChangeReplicaDirRequest before updating znode has negligible > > > impact > > > > > on > > > > > > > the > > > > > > > > chance that the broker processes ChangeReplicaDirRequest > before > > > > > > > > LeaderAndIsrRequest from controller, because the time for > > > > controller > > > > > to > > > > > > > > receive ZK notification, handle state machine changes and > send > > > > > > > > LeaderAndIsrRequests should be much longer than the time for > > > > > > reassignment > > > > > > > > tool to setup connection with broker and send > > > > > ChangeReplicaDirRequest. > > > > > > > Even > > > > > > > > if broker receives LeaderAndIsrRequest a bit sooner, the data > > in > > > > the > > > > > > > > original replica should be smaller enough for .move log to > > catch > > > up > > > > > > very > > > > > > > > quickly, so that broker can swap the log soon after it > receives > > > > > > > > ChangeReplicaDirRequest -- otherwise the > > > > intra.broker.throttled.rate > > > > > is > > > > > > > > probably too small. Does this address your concern with the > > > > > > performance? > > > > > > > > > > > > > > > > One concern with the suggested approach is that the > > > > > > > ChangeReplicaDirRequest > > > > > > > > may be lost if broker crashes before it creates the replica. > I > > > > agree > > > > > it > > > > > > > is > > > > > > > > rare. But it will be confusing when it happens. Operators > would > > > > have > > > > > to > > > > > > > > keep verifying reassignment and possibly retry execution > until > > > > > success > > > > > > if > > > > > > > > they want to make sure that the ChangeReplicaDirRequest is > > > > executed. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 21, 2017 at 8:37 AM, Jun Rao <j...@confluent.io> > > > wrote: > > > > > > > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > > > > > > > 10. I was mainly concerned about the additional complexity > > > needed > > > > > to > > > > > > > > > support log cleaning in the .move log. For example, > > LogToClean > > > is > > > > > > keyed > > > > > > > > off > > > > > > > > > TopicPartition. To be able to support cleaning different > > > > instances > > > > > of > > > > > > > the > > > > > > > > > same partition, we need additional logic. I am not how much > > > > > > additional > > > > > > > > > complexity is needed and whether it's worth it. If we don't > > do > > > > log > > > > > > > > cleaning > > > > > > > > > at all on the .move log, then we don't have to change the > log > > > > > > cleaner's > > > > > > > > > code. > > > > > > > > > > > > > > > > > > 11.2 I was thinking of the following flow. In the execute > > > phase, > > > > > the > > > > > > > > > reassignment tool first issues a ChangeReplicaDirRequest to > > > > brokers > > > > > > > where > > > > > > > > > new replicas will be created. The brokers remember the > > mapping > > > > and > > > > > > > > return a > > > > > > > > > successful code. The reassignment tool then initiates the > > cross > > > > > > broker > > > > > > > > > movement through the controller. In the verify phase, in > > > addition > > > > > to > > > > > > > > > checking the replica assignment at the brokers, it issues > > > > > > > > > DescribeDirsRequest to check the replica to log dirs > mapping. > > > For > > > > > > each > > > > > > > > > partition in the response, the broker returns a state to > > > indicate > > > > > > > whether > > > > > > > > > the replica is final, temporary or pending. If all replicas > > are > > > > in > > > > > > the > > > > > > > > > final state, the tool checks if all replicas are in the > > > expected > > > > > log > > > > > > > > dirs. > > > > > > > > > If they are not, output a warning (and perhaps suggest the > > > users > > > > to > > > > > > > move > > > > > > > > > the data again). However, this should be rare. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 20, 2017 at 10:46 AM, Dong Lin < > > > lindon...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > > > > > > > Thanks for the response! It seems that we have only two > > > > remaining > > > > > > > > issues. > > > > > > > > > > Please see my reply below. > > > > > > > > > > > > > > > > > > > > On Mon, Mar 20, 2017 at 7:45 AM, Jun Rao < > j...@confluent.io > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > > > > > > > > > > > Thanks for the update. A few replies inlined below. > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin < > > > > > lindon...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your comment! Please see my reply below. > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao < > > > j...@confluent.io > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > > > > > > > > > > > 10. Could you comment on that? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Sorry, I missed that comment. > > > > > > > > > > > > > > > > > > > > > > > > Good point. I think the log segments in > > > topicPartition.move > > > > > > > > directory > > > > > > > > > > > will > > > > > > > > > > > > be subject to log truncation, log retention and log > > > > cleaning > > > > > in > > > > > > > the > > > > > > > > > > same > > > > > > > > > > > > way as the log segments in the source log directory. > I > > > just > > > > > > > > specified > > > > > > > > > > > this > > > > > > > > > > > > inthe KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This is ok, but doubles the overhead of log cleaning. > We > > > > > probably > > > > > > > > want > > > > > > > > > to > > > > > > > > > > > think a bit more on this. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think this is OK because the number of replicas that > are > > > > being > > > > > > > moved > > > > > > > > is > > > > > > > > > > limited by the number of ReplicaMoveThread. The default > > > number > > > > of > > > > > > > > > > ReplicaMoveThread is the number of log directories, which > > > mean > > > > we > > > > > > > incur > > > > > > > > > > these overhead for at most one replica per log directory > at > > > any > > > > > > time. > > > > > > > > > > Suppose there are most than 100 replica in any log > > directory, > > > > the > > > > > > > > > increase > > > > > > > > > > in overhead is less than 1%. > > > > > > > > > > > > > > > > > > > > Another way to look at this is that this is no worse than > > > > replica > > > > > > > > > > reassignment. When we reassign replica from one broker to > > > > > another, > > > > > > we > > > > > > > > > will > > > > > > > > > > double the overhread of log cleaning in the cluster for > > this > > > > > > replica. > > > > > > > > If > > > > > > > > > we > > > > > > > > > > are OK with this then we are OK with replica movement > > between > > > > log > > > > > > > > > > directories. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 11.2 "I am concerned that the > ChangeReplicaDirRequest > > > > would > > > > > > be > > > > > > > > lost > > > > > > > > > > if > > > > > > > > > > > > > broker > > > > > > > > > > > > > restarts after it sends ChangeReplicaDirResponse > but > > > > before > > > > > > it > > > > > > > > > > receives > > > > > > > > > > > > > LeaderAndIsrRequest." > > > > > > > > > > > > > > > > > > > > > > > > > > In that case, the reassignment tool could detect > that > > > > > through > > > > > > > > > > > > > DescribeDirsRequest > > > > > > > > > > > > > and issue ChangeReplicaDirRequest again, right? In > > the > > > > > common > > > > > > > > case, > > > > > > > > > > > this > > > > > > > > > > > > is > > > > > > > > > > > > > probably not needed and we only need to write each > > > > replica > > > > > > > once. > > > > > > > > > > > > > > > > > > > > > > > > > > My main concern with the approach in the current > KIP > > is > > > > > that > > > > > > > > once a > > > > > > > > > > new > > > > > > > > > > > > > replica is created in the wrong log dir, the cross > > log > > > > > > > directory > > > > > > > > > > > movement > > > > > > > > > > > > > may not catch up until the new replica is fully > > > > > bootstrapped. > > > > > > > So, > > > > > > > > > we > > > > > > > > > > > end > > > > > > > > > > > > up > > > > > > > > > > > > > writing the data for the same replica twice. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree with your concern. My main concern is that it > > is > > > a > > > > > bit > > > > > > > > weird > > > > > > > > > if > > > > > > > > > > > > ChangeReplicaDirResponse can not guarantee success > and > > > the > > > > > tool > > > > > > > > needs > > > > > > > > > > to > > > > > > > > > > > > rely on DescribeDirResponse to see if it needs to > send > > > > > > > > > > > > ChangeReplicaDirRequest again. > > > > > > > > > > > > > > > > > > > > > > > > How about this: If broker doesn't not have already > > > replica > > > > > > > created > > > > > > > > > for > > > > > > > > > > > the > > > > > > > > > > > > specified topicParition when it receives > > > > > > ChangeReplicaDirRequest, > > > > > > > > it > > > > > > > > > > will > > > > > > > > > > > > reply ReplicaNotAvailableException AND remember > > (replica, > > > > > > > > destination > > > > > > > > > > log > > > > > > > > > > > > directory) pair in memory to create the replica in > the > > > > > > specified > > > > > > > > log > > > > > > > > > > > > directory. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am not sure if returning ReplicaNotAvailableException > > is > > > > > > useful? > > > > > > > > What > > > > > > > > > > > will the client do on receiving > > > ReplicaNotAvailableException > > > > in > > > > > > > this > > > > > > > > > > case? > > > > > > > > > > > > > > > > > > > > > > Perhaps we could just replace the is_temporary field in > > > > > > > > > > > DescribeDirsRresponsePartition with a state field. We > can > > > > use 0 > > > > > > to > > > > > > > > > > indicate > > > > > > > > > > > the partition is created, 1 to indicate the partition > is > > > > > > temporary > > > > > > > > and > > > > > > > > > 2 > > > > > > > > > > to > > > > > > > > > > > indicate that the partition is pending. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ReplicaNotAvailableException is useful because the client > > can > > > > > > re-send > > > > > > > > > > ChangeReplicaDirRequest (with backoff) after receiving > > > > > > > > > > ReplicaNotAvailableException in the response. > > > > > > ChangeReplicaDirRequest > > > > > > > > > will > > > > > > > > > > only succeed after replica has been created for the > > specified > > > > > > > partition > > > > > > > > > in > > > > > > > > > > the broker. > > > > > > > > > > > > > > > > > > > > I think this is cleaner than asking reassignment tool to > > > detect > > > > > > that > > > > > > > > > > through DescribeDirsRequest and issue > > ChangeReplicaDirRequest > > > > > > again. > > > > > > > > Both > > > > > > > > > > solution has the same chance of writing the data for the > > same > > > > > > replica > > > > > > > > > > twice. In the original solution, the reassignment tool > will > > > > keep > > > > > > > > retrying > > > > > > > > > > ChangeReplicaDirRequest until success. In the second > > > suggested > > > > > > > > solution, > > > > > > > > > > the reassignment tool needs to send > > ChangeReplicaDirRequest, > > > > send > > > > > > > > > > DescribeDirsRequest to verify result, and retry > > > > > > > ChangeReplicaDirRequest > > > > > > > > > and > > > > > > > > > > DescribeDirsRequest again if the replica hasn't been > > created > > > > > > already. > > > > > > > > > Thus > > > > > > > > > > the second solution couples ChangeReplicaDirRequest with > > > > > > > > > > DescribeDirsRequest and makes tool's logic is bit more > > > > > complicated. > > > > > > > > > > > > > > > > > > > > Besides, I am not sure I understand your suggestion for > > > > > > is_temporary > > > > > > > > > field. > > > > > > > > > > It seems that a replica can have only two states, i.e. > > normal > > > > it > > > > > is > > > > > > > > being > > > > > > > > > > used to serve fetch/produce requests and temporary if it > > is a > > > > > > replica > > > > > > > > is > > > > > > > > > > that catching up with the normal one. If you think we > > should > > > > have > > > > > > > > > > reassignment tool send DescribeDirsRequest before > retrying > > > > > > > > > > ChangeReplicaDirRequest, can you elaborate a bit what is > > the > > > > > > > "pending" > > > > > > > > > > state? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 11.3 Are you saying the value in --throttle will be > > > used > > > > to > > > > > > set > > > > > > > > > both > > > > > > > > > > > > > intra.broker.throttled.rate and > > > > > leader.follower.replication. > > > > > > > > > > > > > throttled.replicas? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > No. --throttle will be used to only to set > > > > > > > > > leader.follower.replication > > > > > > > > > > as > > > > > > > > > > > > it does now. I think we do not need any option in the > > > > > > > > > > > > kafka-reassignment-partitions.sh to specify > > > > > > > > > > intra.broker.throttled.rate. > > > > > > > > > > > > User canset it in broker config or dynamically using > > > > > > > > kafka-config.sh. > > > > > > > > > > > Does > > > > > > > > > > > > this sound OK? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ok. This sounds good. It would be useful to make this > > clear > > > > in > > > > > > the > > > > > > > > > wiki. > > > > > > > > > > > > > > > > > > > > > > Sure. I have updated the wiki to specify this: "the > quota > > > > > > specified > > > > > > > > by > > > > > > > > > > the > > > > > > > > > > argument `–throttle` will be applied to only inter-broker > > > > replica > > > > > > > > > > reassignment. It does not affect the quota for replica > > > movement > > > > > > > between > > > > > > > > > log > > > > > > > > > > directories". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 12.2 If the user only wants to check one topic, the > > > tool > > > > > > could > > > > > > > do > > > > > > > > > the > > > > > > > > > > > > > filtering on the client side, right? My concern > with > > > > having > > > > > > > both > > > > > > > > > > > log_dirs > > > > > > > > > > > > > and topics is the semantic. For example, if both > are > > > not > > > > > > empty, > > > > > > > > do > > > > > > > > > we > > > > > > > > > > > > > return the intersection or the union? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes the tool could filter on the client side. But the > > > > purpose > > > > > > of > > > > > > > > > having > > > > > > > > > > > > this field is to reduce response side in case broker > > has > > > a > > > > > lot > > > > > > of > > > > > > > > > > topics. > > > > > > > > > > > > The both fields are used as filter and the result is > > > > > > > intersection. > > > > > > > > Do > > > > > > > > > > you > > > > > > > > > > > > think this semantic is confusing or > counter-intuitive? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ok. Could we document the semantic when both dirs and > > > topics > > > > > are > > > > > > > > > > specified? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Sure. I have updated the wiki to specify this: "log_dirs > > and > > > > > topics > > > > > > > are > > > > > > > > > > used to filter the results to include only the specified > > > > > > > log_dir/topic. > > > > > > > > > The > > > > > > > > > > result is the intersection of both filters". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 13, 2017 at 3:32 PM, Dong Lin < > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks much for your detailed comments. Please > see > > my > > > > > reply > > > > > > > > > below. > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao < > > > > > j...@confluent.io > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the updated KIP. Some more comments > > > below. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 10. For the .move log, do we perform any > segment > > > > > deletion > > > > > > > > > (based > > > > > > > > > > on > > > > > > > > > > > > > > > retention) or log cleaning (if a compacted > > topic)? > > > Or > > > > > do > > > > > > we > > > > > > > > > only > > > > > > > > > > > > enable > > > > > > > > > > > > > > > that after the swap? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 11. kafka-reassign-partitions.sh > > > > > > > > > > > > > > > 11.1 If all reassigned replicas are in the > > current > > > > > broker > > > > > > > and > > > > > > > > > > only > > > > > > > > > > > > the > > > > > > > > > > > > > > log > > > > > > > > > > > > > > > directories have changed, we can probably > > optimize > > > > the > > > > > > tool > > > > > > > > to > > > > > > > > > > not > > > > > > > > > > > > > > trigger > > > > > > > > > > > > > > > partition reassignment through the controller > and > > > > only > > > > > > > > > > > > > > > send ChangeReplicaDirRequest. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, the reassignment script should not create > the > > > > > > > reassignment > > > > > > > > > > znode > > > > > > > > > > > > if > > > > > > > > > > > > > no > > > > > > > > > > > > > > replicas are not be moved between brokers. This > > falls > > > > > into > > > > > > > the > > > > > > > > > "How > > > > > > > > > > > to > > > > > > > > > > > > > move > > > > > > > > > > > > > > replica between log directories on the same > broker" > > > of > > > > > the > > > > > > > > > Proposed > > > > > > > > > > > > > Change > > > > > > > > > > > > > > section. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 11.2 If ChangeReplicaDirRequest specifies a > > replica > > > > > > that's > > > > > > > > not > > > > > > > > > > > > created > > > > > > > > > > > > > > yet, > > > > > > > > > > > > > > > could the broker just remember that in memory > and > > > > > create > > > > > > > the > > > > > > > > > > > replica > > > > > > > > > > > > > when > > > > > > > > > > > > > > > the creation is requested? This way, when doing > > > > cluster > > > > > > > > > > expansion, > > > > > > > > > > > we > > > > > > > > > > > > > can > > > > > > > > > > > > > > > make sure that the new replicas on the new > > brokers > > > > are > > > > > > > > created > > > > > > > > > in > > > > > > > > > > > the > > > > > > > > > > > > > > right > > > > > > > > > > > > > > > log directory in the first place. We can also > > avoid > > > > the > > > > > > > tool > > > > > > > > > > having > > > > > > > > > > > > to > > > > > > > > > > > > > > keep > > > > > > > > > > > > > > > issuing ChangeReplicaDirRequest in response to > > > > > > > > > > > > > > > ReplicaNotAvailableException. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am concerned that the ChangeReplicaDirRequest > > would > > > > be > > > > > > lost > > > > > > > > if > > > > > > > > > > > broker > > > > > > > > > > > > > > restarts after it sends ChangeReplicaDirResponse > > but > > > > > before > > > > > > > it > > > > > > > > > > > receives > > > > > > > > > > > > > > LeaderAndIsrRequest. In this case, the user will > > > > receive > > > > > > > > success > > > > > > > > > > when > > > > > > > > > > > > > they > > > > > > > > > > > > > > initiate replica reassignment, but replica > > > reassignment > > > > > > will > > > > > > > > > never > > > > > > > > > > > > > complete > > > > > > > > > > > > > > when they verify the reassignment later. This > would > > > be > > > > > > > > confusing > > > > > > > > > to > > > > > > > > > > > > user. > > > > > > > > > > > > > > > > > > > > > > > > > > > > There are three different approaches to this > > problem > > > if > > > > > > > broker > > > > > > > > > has > > > > > > > > > > > not > > > > > > > > > > > > > > created replica yet after it receives > > > > > > > ChangeReplicaDirResquest: > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) Broker immediately replies to user with > > > > > > > > > > > ReplicaNotAvailableException > > > > > > > > > > > > > and > > > > > > > > > > > > > > user can decide to retry again later. The > advantage > > > of > > > > > this > > > > > > > > > > solution > > > > > > > > > > > is > > > > > > > > > > > > > > that the broker logic is very simple and the > > > > reassignment > > > > > > > > script > > > > > > > > > > > logic > > > > > > > > > > > > > also > > > > > > > > > > > > > > seems straightforward. The disadvantage is that > > user > > > > > script > > > > > > > has > > > > > > > > > to > > > > > > > > > > > > retry. > > > > > > > > > > > > > > But it seems fine - we can set interval between > > > retries > > > > > to > > > > > > be > > > > > > > > 0.5 > > > > > > > > > > sec > > > > > > > > > > > > so > > > > > > > > > > > > > > that broker want be bombarded by those requests. > > This > > > > is > > > > > > the > > > > > > > > > > solution > > > > > > > > > > > > > > chosen in the current KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) Broker can put ChangeReplicaDirRequest in a > > > > purgatory > > > > > > with > > > > > > > > > > timeout > > > > > > > > > > > > and > > > > > > > > > > > > > > replies to user after the replica has been > > created. I > > > > > > didn't > > > > > > > > > choose > > > > > > > > > > > > this > > > > > > > > > > > > > in > > > > > > > > > > > > > > the interest of keeping broker logic simpler. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3) Broker can remember that by making a mark in > the > > > > disk, > > > > > > > e.g. > > > > > > > > > > create > > > > > > > > > > > > > > topicPartition.tomove directory in the > destination > > > log > > > > > > > > directory. > > > > > > > > > > > This > > > > > > > > > > > > > mark > > > > > > > > > > > > > > will be persisted across broker restart. This is > > the > > > > > first > > > > > > > > idea I > > > > > > > > > > had > > > > > > > > > > > > > but I > > > > > > > > > > > > > > replaced it with solution 1) in the interest of > > > keeping > > > > > > > broker > > > > > > > > > > > simple. > > > > > > > > > > > > > > > > > > > > > > > > > > > > It seems that solution 1) is the simplest one > that > > > > works. > > > > > > > But I > > > > > > > > > am > > > > > > > > > > OK > > > > > > > > > > > > to > > > > > > > > > > > > > > switch to the other two solutions if we don't > want > > > the > > > > > > retry > > > > > > > > > logic. > > > > > > > > > > > > What > > > > > > > > > > > > > do > > > > > > > > > > > > > > you think? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 11.3 Do we need an option in the tool to specify > > > > > > > intra.broker. > > > > > > > > > > > > > > > throttled.rate? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I don't find it useful to add this option to > > > > > > > > > > > > > kafka-reassign-partitions.sh. > > > > > > > > > > > > > > The reason we have the option "--throttle" in the > > > > script > > > > > to > > > > > > > > > > throttle > > > > > > > > > > > > > > replication rate is that we usually want higher > > quota > > > > to > > > > > > fix > > > > > > > an > > > > > > > > > > > offline > > > > > > > > > > > > > > replica to get out of URP. But we are OK to have > a > > > > lower > > > > > > > quota > > > > > > > > if > > > > > > > > > > we > > > > > > > > > > > > are > > > > > > > > > > > > > > moving replica only to balance the cluster. Thus > it > > > is > > > > > > common > > > > > > > > for > > > > > > > > > > SRE > > > > > > > > > > > > to > > > > > > > > > > > > > > use different quota when using > > > > > kafka-reassign-partitions.sh > > > > > > > to > > > > > > > > > move > > > > > > > > > > > > > replica > > > > > > > > > > > > > > between brokers. > > > > > > > > > > > > > > > > > > > > > > > > > > > > However, the only reason for moving replica > between > > > log > > > > > > > > > directories > > > > > > > > > > > of > > > > > > > > > > > > > the > > > > > > > > > > > > > > same broker is to balance cluster resource. Thus > > the > > > > > option > > > > > > > to > > > > > > > > > > > > > > specify intra.broker.throttled.rate in the tool > is > > > not > > > > > that > > > > > > > > > > useful. I > > > > > > > > > > > > am > > > > > > > > > > > > > > inclined not to add this option to keep this > tool's > > > > usage > > > > > > > > > simpler. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 12. DescribeDirsRequest > > > > > > > > > > > > > > > 12.1 In other requests like CreateTopicRequest, > > we > > > > > return > > > > > > > an > > > > > > > > > > empty > > > > > > > > > > > > list > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > the response for an empty input list. If the > > input > > > > list > > > > > > is > > > > > > > > > null, > > > > > > > > > > we > > > > > > > > > > > > > > return > > > > > > > > > > > > > > > everything. We should probably follow the same > > > > > convention > > > > > > > > here. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks. I wasn't aware of this convention. I have > > > > change > > > > > > > > > > > > > > DescribeDirsRequest so that "null" indicates > "all". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 12.2 Do we need the topics field? Since the > > request > > > > is > > > > > > > about > > > > > > > > > log > > > > > > > > > > > > dirs, > > > > > > > > > > > > > it > > > > > > > > > > > > > > > makes sense to specify the log dirs. But it's > > weird > > > > to > > > > > > > > specify > > > > > > > > > > > > topics. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The topics field is not necessary. But it is > useful > > > to > > > > > > reduce > > > > > > > > the > > > > > > > > > > > > > response > > > > > > > > > > > > > > size in case user are only interested in the > status > > > of > > > > a > > > > > > few > > > > > > > > > > topics. > > > > > > > > > > > > For > > > > > > > > > > > > > > example, user may have initiated the reassignment > > of > > > a > > > > > > given > > > > > > > > > > replica > > > > > > > > > > > > from > > > > > > > > > > > > > > one log directory to another log directory on the > > > same > > > > > > > broker, > > > > > > > > > and > > > > > > > > > > > the > > > > > > > > > > > > > user > > > > > > > > > > > > > > only wants to check the status of this given > > > partition > > > > by > > > > > > > > looking > > > > > > > > > > > > > > at DescribeDirsResponse. Thus this field is > useful. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am not sure if it is weird to call this request > > > > > > > > > > > DescribeDirsRequest. > > > > > > > > > > > > > The > > > > > > > > > > > > > > response is a map from log directory to > information > > > to > > > > > some > > > > > > > > > > > partitions > > > > > > > > > > > > on > > > > > > > > > > > > > > the log directory. Do you think we need to change > > the > > > > > name > > > > > > of > > > > > > > > the > > > > > > > > > > > > > request? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 12.3 DescribeDirsResponsePartition: Should we > > > include > > > > > > > > > firstOffset > > > > > > > > > > > and > > > > > > > > > > > > > > > nextOffset in the response? That could be > useful > > to > > > > > track > > > > > > > the > > > > > > > > > > > > progress > > > > > > > > > > > > > of > > > > > > > > > > > > > > > the movement. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yeah good point. I agree it is useful to include > > > > > > logEndOffset > > > > > > > > in > > > > > > > > > > the > > > > > > > > > > > > > > response. According to Log.scala doc the > > logEndOffset > > > > is > > > > > > > > > equivalent > > > > > > > > > > > to > > > > > > > > > > > > > the > > > > > > > > > > > > > > nextOffset. User can track progress by checking > the > > > > > > > difference > > > > > > > > > > > between > > > > > > > > > > > > > > logEndOffset of the given partition in the source > > and > > > > > > > > destination > > > > > > > > > > log > > > > > > > > > > > > > > directories. I have added logEndOffset to the > > > > > > > > > > > > > DescribeDirsResponsePartition > > > > > > > > > > > > > > in the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > But it seems that we don't need firstOffset in > the > > > > > > response. > > > > > > > Do > > > > > > > > > you > > > > > > > > > > > > think > > > > > > > > > > > > > > firstOffset is still needed? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 13. ChangeReplicaDirResponse: Do we need error > > code > > > > at > > > > > > both > > > > > > > > > > levels? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > My bad. It is not needed. I have removed request > > > level > > > > > > error > > > > > > > > > code. > > > > > > > > > > I > > > > > > > > > > > > also > > > > > > > > > > > > > > added ChangeReplicaDirRequestTopic and > > > > > > > > > > ChangeReplicaDirResponseTopic > > > > > > > > > > > to > > > > > > > > > > > > > > reduce duplication of the "topic" string in the > > > request > > > > > and > > > > > > > > > > response. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 14. num.replica.move.threads: Does it default > to > > # > > > > log > > > > > > > dirs? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > No. It doesn't. I expect default number to be set > > to > > > a > > > > > > > > > conservative > > > > > > > > > > > > value > > > > > > > > > > > > > > such as 3. It may be surprising to user if the > > number > > > > of > > > > > > > > threads > > > > > > > > > > > > increase > > > > > > > > > > > > > > just because they have assigned more log > > directories > > > to > > > > > > Kafka > > > > > > > > > > broker. > > > > > > > > > > > > > > > > > > > > > > > > > > > > It seems that the number of replica move threads > > > > doesn't > > > > > > have > > > > > > > > to > > > > > > > > > > > depend > > > > > > > > > > > > > on > > > > > > > > > > > > > > the number of log directories. It is possible to > > have > > > > one > > > > > > > > thread > > > > > > > > > > that > > > > > > > > > > > > > moves > > > > > > > > > > > > > > replicas across all log directories. On the other > > > hand > > > > we > > > > > > can > > > > > > > > > have > > > > > > > > > > > > > multiple > > > > > > > > > > > > > > threads to move replicas to the same log > directory. > > > For > > > > > > > > example, > > > > > > > > > if > > > > > > > > > > > > > broker > > > > > > > > > > > > > > uses SSD, the CPU instead of disk IO may be the > > > replica > > > > > > move > > > > > > > > > > > bottleneck > > > > > > > > > > > > > and > > > > > > > > > > > > > > it will be faster to move replicas using multiple > > > > threads > > > > > > per > > > > > > > > log > > > > > > > > > > > > > > directory. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 9, 2017 at 7:04 PM, Dong Lin < > > > > > > > > lindon...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I just made one correction in the KIP. If > > broker > > > > > > receives > > > > > > > > > > > > > > > > ChangeReplicaDirRequest and the replica > hasn't > > > been > > > > > > > created > > > > > > > > > > > there, > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > broker will respond > > ReplicaNotAvailableException. > > > > > > > > > > > > > > > > The kafka-reassignemnt-partitions.sh will > need > > > to > > > > > > > re-send > > > > > > > > > > > > > > > > ChangeReplicaDirRequest in this case in order > > to > > > > wait > > > > > > for > > > > > > > > > > > > controller > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > send LeaderAndIsrRequest to broker. The > > previous > > > > > > approach > > > > > > > > of > > > > > > > > > > > > creating > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > empty directory seems hacky. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin < > > > > > > > > > lindon...@gmail.com> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your comments! I have updated > the > > > KIP > > > > to > > > > > > > > address > > > > > > > > > > > your > > > > > > > > > > > > > > > > comments. > > > > > > > > > > > > > > > > > Please see my reply inline. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you let me know if the latest KIP has > > > > addressed > > > > > > > your > > > > > > > > > > > > comments? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao < > > > > > > > > j...@confluent.io> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Hi, Dong, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> Thanks for the reply. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> 1.3 So the thread gets the lock, checks if > > > > caught > > > > > up > > > > > > > and > > > > > > > > > > > > releases > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > lock > > > > > > > > > > > > > > > > >> if not? Then, in the case when there is > > > > continuous > > > > > > > > > incoming > > > > > > > > > > > > data, > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> thread may never get a chance to swap. One > > way > > > > to > > > > > > > > address > > > > > > > > > > this > > > > > > > > > > > > is > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> thread is getting really close in catching > > up, > > > > > just > > > > > > > hold > > > > > > > > > > onto > > > > > > > > > > > > the > > > > > > > > > > > > > > lock > > > > > > > > > > > > > > > > >> until the thread fully catches up. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, that was my original solution. I see > > your > > > > > point > > > > > > > that > > > > > > > > > the > > > > > > > > > > > > lock > > > > > > > > > > > > > > may > > > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > > be fairly assigned to ReplicaMoveThread and > > > > > > > > > > > RequestHandlerThread > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > there > > > > > > > > > > > > > > > > > is frequent incoming requets. You solution > > > should > > > > > > > address > > > > > > > > > the > > > > > > > > > > > > > problem > > > > > > > > > > > > > > > > and I > > > > > > > > > > > > > > > > > have updated the KIP to use it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> 2.3 So, you are saying that the partition > > > > > > reassignment > > > > > > > > > tool > > > > > > > > > > > can > > > > > > > > > > > > > > first > > > > > > > > > > > > > > > > send > > > > > > > > > > > > > > > > >> a ChangeReplicaDirRequest to relevant > > brokers > > > to > > > > > > > > establish > > > > > > > > > > the > > > > > > > > > > > > log > > > > > > > > > > > > > > dir > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > >> replicas not created yet, then trigger the > > > > > partition > > > > > > > > > > movement > > > > > > > > > > > > > across > > > > > > > > > > > > > > > > >> brokers through the controller? That's > > > actually > > > > a > > > > > > good > > > > > > > > > idea. > > > > > > > > > > > > Then, > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > >> just leave LeaderAndIsrRequest as it is. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, that is what I plan to do. If broker > > > > receives > > > > > a > > > > > > > > > > > > > > > > > ChangeReplicaDirRequest while it is not > > leader > > > or > > > > > > > > follower > > > > > > > > > of > > > > > > > > > > > the > > > > > > > > > > > > > > > > > partition, the broker will create an empty > > Log > > > > > > instance > > > > > > > > > > (i.e. a > > > > > > > > > > > > > > > directory > > > > > > > > > > > > > > > > > named topicPartition) in the destination > log > > > > > > directory > > > > > > > so > > > > > > > > > > that > > > > > > > > > > > > the > > > > > > > > > > > > > > > > replica > > > > > > > > > > > > > > > > > will be placed there when broker receives > > > > > > > > > LeaderAndIsrRequest > > > > > > > > > > > > from > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > broker. The broker should clean up empty > > those > > > > Log > > > > > > > > > instances > > > > > > > > > > on > > > > > > > > > > > > > > startup > > > > > > > > > > > > > > > > > just in case a ChangeReplicaDirRequest was > > > > > mistakenly > > > > > > > > sent > > > > > > > > > > to a > > > > > > > > > > > > > > broker > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > was not meant to be follower/leader of the > > > > > > partition.. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Another thing related to > > > > > > > > > > > > > > > > >> ChangeReplicaDirRequest. > > > > > > > > > > > > > > > > >> Since this request may take long to > > complete, > > > I > > > > am > > > > > > not > > > > > > > > > sure > > > > > > > > > > if > > > > > > > > > > > > we > > > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > >> wait for the movement to complete before > > > > respond. > > > > > > > While > > > > > > > > > > > waiting > > > > > > > > > > > > > for > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> movement to complete, the idle connection > > may > > > be > > > > > > > killed > > > > > > > > or > > > > > > > > > > the > > > > > > > > > > > > > > client > > > > > > > > > > > > > > > > may > > > > > > > > > > > > > > > > >> be gone already. An alternative is to > return > > > > > > > immediately > > > > > > > > > and > > > > > > > > > > > > add a > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > >> request like CheckReplicaDirRequest to see > > if > > > > the > > > > > > > > movement > > > > > > > > > > has > > > > > > > > > > > > > > > > completed. > > > > > > > > > > > > > > > > >> The tool can take advantage of that to > check > > > the > > > > > > > status. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree with your concern and solution. We > > need > > > > > > request > > > > > > > > to > > > > > > > > > > > query > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > partition -> log_directory mapping on the > > > > broker. I > > > > > > > have > > > > > > > > > > > updated > > > > > > > > > > > > > the > > > > > > > > > > > > > > > KIP > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > remove need for > > ChangeReplicaDirRequestPurgato > > > > ry. > > > > > > > > > > > > > > > > > Instead, kafka-reassignemnt-partitions.sh > > will > > > > > send > > > > > > > > > > > > > > > DescribeDirsRequest > > > > > > > > > > > > > > > > > to brokers when user wants to verify the > > > > partition > > > > > > > > > > assignment. > > > > > > > > > > > > > Since > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > > need this DescribeDirsRequest anyway, we > can > > > also > > > > > use > > > > > > > > this > > > > > > > > > > > > request > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > expose stats like the individual log size > > > instead > > > > > of > > > > > > > > using > > > > > > > > > > JMX. > > > > > > > > > > > > One > > > > > > > > > > > > > > > > > drawback of using JMX is that user has to > > > manage > > > > > the > > > > > > > JMX > > > > > > > > > port > > > > > > > > > > > and > > > > > > > > > > > > > > > related > > > > > > > > > > > > > > > > > credentials if they haven't already done > > this, > > > > > which > > > > > > is > > > > > > > > the > > > > > > > > > > > case > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > LinkedIn. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Thanks, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> Jun > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin < > > > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > Hey Jun, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Thanks for the detailed explanation. I > > will > > > > use > > > > > > the > > > > > > > > > > separate > > > > > > > > > > > > > > thread > > > > > > > > > > > > > > > > >> pool to > > > > > > > > > > > > > > > > >> > move replica between log directories. I > > will > > > > let > > > > > > you > > > > > > > > > know > > > > > > > > > > > when > > > > > > > > > > > > > the > > > > > > > > > > > > > > > KIP > > > > > > > > > > > > > > > > >> has > > > > > > > > > > > > > > > > >> > been updated to use a separate thread > > pool. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Here is my response to your other > > questions: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > 1.3 My idea is that the > ReplicaMoveThread > > > that > > > > > > moves > > > > > > > > > data > > > > > > > > > > > > should > > > > > > > > > > > > > > get > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > lock before checking whether the replica > > in > > > > the > > > > > > > > > > destination > > > > > > > > > > > > log > > > > > > > > > > > > > > > > >> directory > > > > > > > > > > > > > > > > >> > has caught up. If the new replica has > > caught > > > > up, > > > > > > > then > > > > > > > > > the > > > > > > > > > > > > > > > > >> ReplicaMoveThread > > > > > > > > > > > > > > > > >> > should swaps the replica while it is > still > > > > > holding > > > > > > > the > > > > > > > > > > lock. > > > > > > > > > > > > The > > > > > > > > > > > > > > > > >> > ReplicaFetcherThread or > > RequestHandlerThread > > > > > will > > > > > > > not > > > > > > > > be > > > > > > > > > > > able > > > > > > > > > > > > to > > > > > > > > > > > > > > > > append > > > > > > > > > > > > > > > > >> > data to the replica in the source > replica > > > > during > > > > > > > this > > > > > > > > > > period > > > > > > > > > > > > > > because > > > > > > > > > > > > > > > > >> they > > > > > > > > > > > > > > > > >> > can not get the lock. Does this address > > the > > > > > > problem? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > 2.3 I get your point that we want to > keep > > > > > > controller > > > > > > > > > > > simpler. > > > > > > > > > > > > If > > > > > > > > > > > > > > > admin > > > > > > > > > > > > > > > > >> tool > > > > > > > > > > > > > > > > >> > can send ChangeReplicaDirRequest to move > > > data > > > > > > > within a > > > > > > > > > > > broker, > > > > > > > > > > > > > > then > > > > > > > > > > > > > > > > >> > controller probably doesn't even need to > > > > include > > > > > > log > > > > > > > > > > > directory > > > > > > > > > > > > > > path > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> the > > > > > > > > > > > > > > > > >> > LeaderAndIsrRequest. How about this: > > > > controller > > > > > > will > > > > > > > > > only > > > > > > > > > > > deal > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > >> > reassignment across brokers as it does > > now. > > > If > > > > > > user > > > > > > > > > > > specified > > > > > > > > > > > > > > > > >> destination > > > > > > > > > > > > > > > > >> > replica for any disk, the admin tool > will > > > send > > > > > > > > > > > > > > > ChangeReplicaDirRequest > > > > > > > > > > > > > > > > >> and > > > > > > > > > > > > > > > > >> > wait for response from broker to confirm > > > that > > > > > all > > > > > > > > > replicas > > > > > > > > > > > > have > > > > > > > > > > > > > > been > > > > > > > > > > > > > > > > >> moved > > > > > > > > > > > > > > > > >> > to the destination log direcotry. The > > broker > > > > > will > > > > > > > put > > > > > > > > > > > > > > > > >> > ChangeReplicaDirRequset in a purgatory > and > > > > > respond > > > > > > > > > either > > > > > > > > > > > when > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> movement > > > > > > > > > > > > > > > > >> > is completed or when the request has > > > > timed-out. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > 4. I agree that we can expose these > > metrics > > > > via > > > > > > JMX. > > > > > > > > > But I > > > > > > > > > > > am > > > > > > > > > > > > > not > > > > > > > > > > > > > > > sure > > > > > > > > > > > > > > > > >> if > > > > > > > > > > > > > > > > >> > it can be obtained easily with good > > > > performance > > > > > > > using > > > > > > > > > > either > > > > > > > > > > > > > > > existing > > > > > > > > > > > > > > > > >> tools > > > > > > > > > > > > > > > > >> > or new script in kafka. I will ask SREs > > for > > > > > their > > > > > > > > > opinion. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Thanks, > > > > > > > > > > > > > > > > >> > Dong > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao > < > > > > > > > > > j...@confluent.io > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > Hi, Dong, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > Thanks for the updated KIP. A few more > > > > > comments > > > > > > > > below. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > 1.1 and 1.2: I am still not sure there > > is > > > > > enough > > > > > > > > > benefit > > > > > > > > > > > of > > > > > > > > > > > > > > > reusing > > > > > > > > > > > > > > > > >> > > ReplicaFetchThread > > > > > > > > > > > > > > > > >> > > to move data across disks. > > > > > > > > > > > > > > > > >> > > (a) A big part of ReplicaFetchThread > is > > to > > > > > deal > > > > > > > with > > > > > > > > > > > issuing > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > >> tracking > > > > > > > > > > > > > > > > >> > > fetch requests. So, it doesn't feel > that > > > we > > > > > get > > > > > > > much > > > > > > > > > > from > > > > > > > > > > > > > > reusing > > > > > > > > > > > > > > > > >> > > ReplicaFetchThread > > > > > > > > > > > > > > > > >> > > only to disable the fetching part. > > > > > > > > > > > > > > > > >> > > (b) The leader replica has no > > > > > ReplicaFetchThread > > > > > > > to > > > > > > > > > > start > > > > > > > > > > > > > with. > > > > > > > > > > > > > > It > > > > > > > > > > > > > > > > >> feels > > > > > > > > > > > > > > > > >> > > weird to start one just for intra > broker > > > > data > > > > > > > > > movement. > > > > > > > > > > > > > > > > >> > > (c) The ReplicaFetchThread is per > > broker. > > > > > > > > Intuitively, > > > > > > > > > > the > > > > > > > > > > > > > > number > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > >> > > threads doing intra broker data > movement > > > > > should > > > > > > be > > > > > > > > > > related > > > > > > > > > > > > to > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> number > > > > > > > > > > > > > > > > >> > of > > > > > > > > > > > > > > > > >> > > disks in the broker, not the number of > > > > brokers > > > > > > in > > > > > > > > the > > > > > > > > > > > > cluster. > > > > > > > > > > > > > > > > >> > > (d) If the destination disk fails, we > > want > > > > to > > > > > > stop > > > > > > > > the > > > > > > > > > > > intra > > > > > > > > > > > > > > > broker > > > > > > > > > > > > > > > > >> data > > > > > > > > > > > > > > > > >> > > movement, but want to continue inter > > > broker > > > > > > > > > replication. > > > > > > > > > > > So, > > > > > > > > > > > > > > > > >> logically, > > > > > > > > > > > > > > > > >> > it > > > > > > > > > > > > > > > > >> > > seems it's better to separate out the > > two. > > > > > > > > > > > > > > > > >> > > (e) I am also not sure if we should > > reuse > > > > the > > > > > > > > existing > > > > > > > > > > > > > > throttling > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > >> > > replication. It's designed to handle > > > traffic > > > > > > > across > > > > > > > > > > > brokers > > > > > > > > > > > > > and > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > delaying is done in the fetch request. > > So, > > > > if > > > > > we > > > > > > > are > > > > > > > > > not > > > > > > > > > > > > doing > > > > > > > > > > > > > > > > >> > > fetching in ReplicaFetchThread, > > > > > > > > > > > > > > > > >> > > I am not sure the existing throttling > is > > > > > > > effective. > > > > > > > > > > Also, > > > > > > > > > > > > when > > > > > > > > > > > > > > > > >> specifying > > > > > > > > > > > > > > > > >> > > the throttling of moving data across > > > disks, > > > > it > > > > > > > seems > > > > > > > > > the > > > > > > > > > > > > user > > > > > > > > > > > > > > > > >> shouldn't > > > > > > > > > > > > > > > > >> > > care about whether a replica is a > leader > > > or > > > > a > > > > > > > > > follower. > > > > > > > > > > > > > Reusing > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > existing throttling config name will > be > > > > > awkward > > > > > > in > > > > > > > > > this > > > > > > > > > > > > > regard. > > > > > > > > > > > > > > > > >> > > (f) It seems it's simpler and more > > > > consistent > > > > > to > > > > > > > > use a > > > > > > > > > > > > > separate > > > > > > > > > > > > > > > > thread > > > > > > > > > > > > > > > > >> > pool > > > > > > > > > > > > > > > > >> > > for local data movement (for both > leader > > > and > > > > > > > > follower > > > > > > > > > > > > > replicas). > > > > > > > > > > > > > > > > This > > > > > > > > > > > > > > > > >> > > process can then be configured (e.g. > > > number > > > > of > > > > > > > > > threads, > > > > > > > > > > > etc) > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > >> > throttled > > > > > > > > > > > > > > > > >> > > independently. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > 1.3 Yes, we will need some > > synchronization > > > > > > there. > > > > > > > > So, > > > > > > > > > if > > > > > > > > > > > the > > > > > > > > > > > > > > > > movement > > > > > > > > > > > > > > > > >> > > thread catches up, gets the lock to do > > the > > > > > swap, > > > > > > > but > > > > > > > > > > > > realizes > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > >> > data > > > > > > > > > > > > > > > > >> > > is added, it has to continue catching > up > > > > while > > > > > > > > holding > > > > > > > > > > the > > > > > > > > > > > > > lock? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > 2.3 The benefit of including the > desired > > > log > > > > > > > > directory > > > > > > > > > > in > > > > > > > > > > > > > > > > >> > > LeaderAndIsrRequest > > > > > > > > > > > > > > > > >> > > during partition reassignment is that > > the > > > > > > > controller > > > > > > > > > > > doesn't > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> > track > > > > > > > > > > > > > > > > >> > > the progress for disk movement. So, > you > > > > don't > > > > > > need > > > > > > > > the > > > > > > > > > > > > > > additional > > > > > > > > > > > > > > > > >> > > BrokerDirStateUpdateRequest. Then the > > > > > controller > > > > > > > > never > > > > > > > > > > > needs > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > issue > > > > > > > > > > > > > > > > >> > > ChangeReplicaDirRequest. > > > > > > > > > > > > > > > > >> > > Only the admin tool will issue > > > > > > > > ChangeReplicaDirRequest > > > > > > > > > > to > > > > > > > > > > > > move > > > > > > > > > > > > > > > data > > > > > > > > > > > > > > > > >> > within > > > > > > > > > > > > > > > > >> > > a broker. I agree that this makes > > > > > > > > LeaderAndIsrRequest > > > > > > > > > > more > > > > > > > > > > > > > > > > >> complicated, > > > > > > > > > > > > > > > > >> > but > > > > > > > > > > > > > > > > >> > > that seems simpler than changing the > > > > > controller > > > > > > to > > > > > > > > > track > > > > > > > > > > > > > > > additional > > > > > > > > > > > > > > > > >> > states > > > > > > > > > > > > > > > > >> > > during partition reassignment. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > 4. We want to make a decision on how > to > > > > expose > > > > > > the > > > > > > > > > > stats. > > > > > > > > > > > So > > > > > > > > > > > > > > far, > > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > >> are > > > > > > > > > > > > > > > > >> > > exposing stats like the individual log > > > size > > > > as > > > > > > > JMX. > > > > > > > > > So, > > > > > > > > > > > one > > > > > > > > > > > > > way > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> > just > > > > > > > > > > > > > > > > >> > > add new jmx to expose the log > directory > > of > > > > > > > > individual > > > > > > > > > > > > > replicas. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > Thanks, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > Jun > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > On Thu, Mar 2, 2017 at 11:18 PM, Dong > > Lin > > > < > > > > > > > > > > > > > lindon...@gmail.com> > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > Hey Jun, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > Thanks for all the comments! Please > > see > > > my > > > > > > > answer > > > > > > > > > > > below. I > > > > > > > > > > > > > > have > > > > > > > > > > > > > > > > >> updated > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > KIP to address most of the questions > > and > > > > > make > > > > > > > the > > > > > > > > > KIP > > > > > > > > > > > > easier > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> > > understand. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > Thanks, > > > > > > > > > > > > > > > > >> > > > Dong > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > On Thu, Mar 2, 2017 at 9:35 AM, Jun > > Rao > > > < > > > > > > > > > > > j...@confluent.io > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > Hi, Dong, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > Thanks for the KIP. A few comments > > > > below. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > 1. For moving data across > > directories > > > > > > > > > > > > > > > > >> > > > > 1.1 I am not sure why we want to > use > > > > > > > > > > > > ReplicaFetcherThread > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > move > > > > > > > > > > > > > > > > >> > data > > > > > > > > > > > > > > > > >> > > > > around in the leader. > > > ReplicaFetchThread > > > > > > > fetches > > > > > > > > > > data > > > > > > > > > > > > from > > > > > > > > > > > > > > > > socket. > > > > > > > > > > > > > > > > >> > For > > > > > > > > > > > > > > > > >> > > > > moving data locally, it seems that > > we > > > > want > > > > > > to > > > > > > > > > avoid > > > > > > > > > > > the > > > > > > > > > > > > > > socket > > > > > > > > > > > > > > > > >> > > overhead. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > The purpose of using > > ReplicaFetchThread > > > is > > > > > to > > > > > > > > re-use > > > > > > > > > > > > > existing > > > > > > > > > > > > > > > > thread > > > > > > > > > > > > > > > > >> > > > instead of creating more threads and > > > make > > > > > our > > > > > > > > thread > > > > > > > > > > > model > > > > > > > > > > > > > > more > > > > > > > > > > > > > > > > >> > complex. > > > > > > > > > > > > > > > > >> > > It > > > > > > > > > > > > > > > > >> > > > seems like a nature choice for > copying > > > > data > > > > > > > > between > > > > > > > > > > > disks > > > > > > > > > > > > > > since > > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > >> is > > > > > > > > > > > > > > > > >> > > > similar to copying data between > > brokers. > > > > > > Another > > > > > > > > > > reason > > > > > > > > > > > is > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > if > > > > > > > > > > > > > > > > >> the > > > > > > > > > > > > > > > > >> > > > replica to be moved is a follower, > we > > > > don't > > > > > > need > > > > > > > > > lock > > > > > > > > > > to > > > > > > > > > > > > > swap > > > > > > > > > > > > > > > > >> replicas > > > > > > > > > > > > > > > > >> > > when > > > > > > > > > > > > > > > > >> > > > destination replica has caught up, > > since > > > > the > > > > > > > same > > > > > > > > > > thread > > > > > > > > > > > > > which > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > >> > > fetching > > > > > > > > > > > > > > > > >> > > > data from leader will swap the > > replica. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > The ReplicaFetchThread will not > incur > > > > socket > > > > > > > > > overhead > > > > > > > > > > > > while > > > > > > > > > > > > > > > > copying > > > > > > > > > > > > > > > > >> > data > > > > > > > > > > > > > > > > >> > > > between disks. It will read directly > > > from > > > > > > source > > > > > > > > > disk > > > > > > > > > > > (as > > > > > > > > > > > > we > > > > > > > > > > > > > > do > > > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > >> > > > processing FetchRequest) and write > to > > > > > > > destination > > > > > > > > > disk > > > > > > > > > > > (as > > > > > > > > > > > > > we > > > > > > > > > > > > > > do > > > > > > > > > > > > > > > > >> when > > > > > > > > > > > > > > > > >> > > > processing ProduceRequest). > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > 1.2 I am also not sure about > moving > > > data > > > > > in > > > > > > > the > > > > > > > > > > > > > > > > >> ReplicaFetcherThread > > > > > > > > > > > > > > > > >> > in > > > > > > > > > > > > > > > > >> > > > the > > > > > > > > > > > > > > > > >> > > > > follower. For example, I am not > sure > > > > > setting > > > > > > > > > > > > > > > > >> replica.fetch.max.wait > > > > > > > > > > > > > > > > >> > to > > > > > > > > > > > > > > > > >> > > 0 > > > > > > > > > > > > > > > > >> > > > > is ideal. It may not always be > > > > effective > > > > > > > since > > > > > > > > a > > > > > > > > > > > fetch > > > > > > > > > > > > > > > request > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > > > > >> > > > > ReplicaFetcherThread could be > > > > arbitrarily > > > > > > > > delayed > > > > > > > > > > due > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> replication > > > > > > > > > > > > > > > > >> > > > > throttling on the leader. In > > general, > > > > the > > > > > > data > > > > > > > > > > > movement > > > > > > > > > > > > > > logic > > > > > > > > > > > > > > > > >> across > > > > > > > > > > > > > > > > >> > > > disks > > > > > > > > > > > > > > > > >> > > > > seems different from that in > > > > > > > > ReplicaFetcherThread. > > > > > > > > > > > So, I > > > > > > > > > > > > > am > > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > >> sure > > > > > > > > > > > > > > > > >> > > why > > > > > > > > > > > > > > > > >> > > > > they need to be coupled. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > While it may not be the most > efficient > > > way > > > > > to > > > > > > > copy > > > > > > > > > > data > > > > > > > > > > > > > > between > > > > > > > > > > > > > > > > >> local > > > > > > > > > > > > > > > > >> > > > disks, it will be at least as > > efficient > > > as > > > > > > > copying > > > > > > > > > > data > > > > > > > > > > > > from > > > > > > > > > > > > > > > > leader > > > > > > > > > > > > > > > > >> to > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > destination disk. The expected goal > of > > > > > KIP-113 > > > > > > > is > > > > > > > > to > > > > > > > > > > > > enable > > > > > > > > > > > > > > data > > > > > > > > > > > > > > > > >> > movement > > > > > > > > > > > > > > > > >> > > > between disks with no less > efficiency > > > than > > > > > > what > > > > > > > we > > > > > > > > > do > > > > > > > > > > > now > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > >> moving > > > > > > > > > > > > > > > > >> > > data > > > > > > > > > > > > > > > > >> > > > between brokers. I think we can > > optimize > > > > its > > > > > > > > > > performance > > > > > > > > > > > > > using > > > > > > > > > > > > > > > > >> separate > > > > > > > > > > > > > > > > >> > > > thread if the performance is not > good > > > > > enough. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > 1.3 Could you add a bit more > details > > > on > > > > > how > > > > > > we > > > > > > > > > swap > > > > > > > > > > > the > > > > > > > > > > > > > > > replicas > > > > > > > > > > > > > > > > >> when > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > > new ones are fully caught up? For > > > > example, > > > > > > > what > > > > > > > > > > > happens > > > > > > > > > > > > > when > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> new > > > > > > > > > > > > > > > > >> > > > > replica in the new log directory > is > > > > caught > > > > > > up, > > > > > > > > but > > > > > > > > > > > when > > > > > > > > > > > > we > > > > > > > > > > > > > > > want > > > > > > > > > > > > > > > > >> to do > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > > swap, some new data has arrived? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > If the replica is a leader, then > > > > > > > > > ReplicaFetcherThread > > > > > > > > > > > will > > > > > > > > > > > > > > > perform > > > > > > > > > > > > > > > > >> the > > > > > > > > > > > > > > > > >> > > > replacement. Proper lock is needed > to > > > > > prevent > > > > > > > > > > > > > > > KafkaRequestHandler > > > > > > > > > > > > > > > > >> from > > > > > > > > > > > > > > > > >> > > > appending data to the > > topicPartition.log > > > > on > > > > > > the > > > > > > > > > source > > > > > > > > > > > > disks > > > > > > > > > > > > > > > > before > > > > > > > > > > > > > > > > >> > this > > > > > > > > > > > > > > > > >> > > > replacement is completed by > > > > > > > ReplicaFetcherThread. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > If the replica is a follower, > because > > > the > > > > > same > > > > > > > > > > > > > > > ReplicaFetchThread > > > > > > > > > > > > > > > > >> which > > > > > > > > > > > > > > > > >> > > > fetches data from leader will also > > swap > > > > the > > > > > > > > replica > > > > > > > > > , > > > > > > > > > > no > > > > > > > > > > > > > lock > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > >> > needed. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > I have updated the KIP to specify > both > > > > more > > > > > > > > > > explicitly. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > 1.4 Do we need to do the .move at > > the > > > > log > > > > > > > > segment > > > > > > > > > > > level > > > > > > > > > > > > or > > > > > > > > > > > > > > > could > > > > > > > > > > > > > > > > >> we > > > > > > > > > > > > > > > > >> > > just > > > > > > > > > > > > > > > > >> > > > do > > > > > > > > > > > > > > > > >> > > > > that at the replica directory > level? > > > > > > Renaming > > > > > > > > > just a > > > > > > > > > > > > > > directory > > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > >> > much > > > > > > > > > > > > > > > > >> > > > > faster than renaming the log > > segments. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > Great point. I have updated the KIP > to > > > > > rename > > > > > > > the > > > > > > > > > log > > > > > > > > > > > > > > directory > > > > > > > > > > > > > > > > >> > instead. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > 1.5 Could you also describe a bit > > what > > > > > > happens > > > > > > > > > when > > > > > > > > > > > > either > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> source > > > > > > > > > > > > > > > > >> > > or > > > > > > > > > > > > > > > > >> > > > > the target log directory fails > while > > > the > > > > > > data > > > > > > > > > moving > > > > > > > > > > > is > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> progress? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > If source log directory fails, then > > the > > > > > > replica > > > > > > > > > > movement > > > > > > > > > > > > > will > > > > > > > > > > > > > > > stop > > > > > > > > > > > > > > > > >> and > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > source replica is marked offline. If > > > > > > destination > > > > > > > > log > > > > > > > > > > > > > directory > > > > > > > > > > > > > > > > >> fails, > > > > > > > > > > > > > > > > >> > > then > > > > > > > > > > > > > > > > >> > > > the replica movement will stop. I > have > > > > > updated > > > > > > > the > > > > > > > > > KIP > > > > > > > > > > > to > > > > > > > > > > > > > > > clarify > > > > > > > > > > > > > > > > >> this. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > 2. For partition reassignment. > > > > > > > > > > > > > > > > >> > > > > 2.1 I am not sure if the > controller > > > can > > > > > > block > > > > > > > on > > > > > > > > > > > > > > > > >> > > ChangeReplicaDirRequest. > > > > > > > > > > > > > > > > >> > > > > Data movement may take a long time > > to > > > > > > > complete. > > > > > > > > If > > > > > > > > > > > there > > > > > > > > > > > > > is > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > >> > > > outstanding > > > > > > > > > > > > > > > > >> > > > > request from the controller to a > > > broker, > > > > > > that > > > > > > > > > broker > > > > > > > > > > > > won't > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > >> able to > > > > > > > > > > > > > > > > >> > > > > process any new request from the > > > > > controller. > > > > > > > So > > > > > > > > if > > > > > > > > > > > > another > > > > > > > > > > > > > > > event > > > > > > > > > > > > > > > > >> > (e.g. > > > > > > > > > > > > > > > > >> > > > > broker failure) happens when the > > data > > > > > > movement > > > > > > > > is > > > > > > > > > in > > > > > > > > > > > > > > progress, > > > > > > > > > > > > > > > > >> > > subsequent > > > > > > > > > > > > > > > > >> > > > > LeaderAnIsrRequest will be > delayed. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > Yeah good point. I missed the fact > > that > > > > > there > > > > > > is > > > > > > > > be > > > > > > > > > > only > > > > > > > > > > > > one > > > > > > > > > > > > > > > > >> inflight > > > > > > > > > > > > > > > > >> > > > request from controller to broker. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > How about I add a request, e.g. > > > > > > > > > > > > BrokerDirStateUpdateRequest, > > > > > > > > > > > > > > > which > > > > > > > > > > > > > > > > >> maps > > > > > > > > > > > > > > > > >> > > > topicPartition to log directory and > > can > > > be > > > > > > sent > > > > > > > > from > > > > > > > > > > > > broker > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> > controller > > > > > > > > > > > > > > > > >> > > > to indicate completion? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > 2.2 in the KIP, the partition > > > > reassignment > > > > > > > tool > > > > > > > > is > > > > > > > > > > > also > > > > > > > > > > > > > used > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > >> > cases > > > > > > > > > > > > > > > > >> > > > > where an admin just wants to > balance > > > the > > > > > > > > existing > > > > > > > > > > data > > > > > > > > > > > > > > across > > > > > > > > > > > > > > > > log > > > > > > > > > > > > > > > > >> > > > > directories in the broker. In this > > > case, > > > > > it > > > > > > > > seems > > > > > > > > > > that > > > > > > > > > > > > > it's > > > > > > > > > > > > > > > over > > > > > > > > > > > > > > > > >> > > killing > > > > > > > > > > > > > > > > >> > > > to > > > > > > > > > > > > > > > > >> > > > > have the process go through the > > > > > controller. > > > > > > A > > > > > > > > > > simpler > > > > > > > > > > > > > > approach > > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > >> to > > > > > > > > > > > > > > > > >> > > > issue > > > > > > > > > > > > > > > > >> > > > > an RPC request to the broker > > directly. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > I agree we can optimize this case. > It > > is > > > > > just > > > > > > > that > > > > > > > > > we > > > > > > > > > > > have > > > > > > > > > > > > > to > > > > > > > > > > > > > > > add > > > > > > > > > > > > > > > > >> new > > > > > > > > > > > > > > > > >> > > logic > > > > > > > > > > > > > > > > >> > > > or code path to handle a scenario > that > > > is > > > > > > > already > > > > > > > > > > > covered > > > > > > > > > > > > by > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> more > > > > > > > > > > > > > > > > >> > > > complicated scenario. I will add it > to > > > the > > > > > > KIP. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > 2.3 When using the partition > > > > reassignment > > > > > > tool > > > > > > > > to > > > > > > > > > > move > > > > > > > > > > > > > > > replicas > > > > > > > > > > > > > > > > >> > across > > > > > > > > > > > > > > > > >> > > > > brokers, it make sense to be able > to > > > > > specify > > > > > > > the > > > > > > > > > log > > > > > > > > > > > > > > directory > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > >> the > > > > > > > > > > > > > > > > >> > > > newly > > > > > > > > > > > > > > > > >> > > > > created replicas. The KIP does > that > > in > > > > two > > > > > > > > > separate > > > > > > > > > > > > > requests > > > > > > > > > > > > > > > > >> > > > > ChangeReplicaDirRequest and > > > > > > > LeaderAndIsrRequest, > > > > > > > > > and > > > > > > > > > > > > > tracks > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > progress > > > > > > > > > > > > > > > > >> > > > of > > > > > > > > > > > > > > > > >> > > > > each independently. An alternative > > is > > > to > > > > > do > > > > > > > that > > > > > > > > > > just > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> > > > > LeaderAndIsrRequest. > > > > > > > > > > > > > > > > >> > > > > That way, the new replicas will be > > > > created > > > > > > in > > > > > > > > the > > > > > > > > > > > right > > > > > > > > > > > > > log > > > > > > > > > > > > > > > dir > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > > > > >> > > > > first place and the controller > just > > > > needs > > > > > to > > > > > > > > track > > > > > > > > > > the > > > > > > > > > > > > > > > progress > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > >> > > > > partition reassignment in the > > current > > > > way. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > I agree it is better to use one > > request > > > > > > instead > > > > > > > of > > > > > > > > > two > > > > > > > > > > > to > > > > > > > > > > > > > > > request > > > > > > > > > > > > > > > > >> > replica > > > > > > > > > > > > > > > > >> > > > movement between disks. But I think > > the > > > > > > > > performance > > > > > > > > > > > > > advantage > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > >> doing > > > > > > > > > > > > > > > > >> > so > > > > > > > > > > > > > > > > >> > > > is negligible because we trigger > > replica > > > > > > > > assignment > > > > > > > > > > much > > > > > > > > > > > > > less > > > > > > > > > > > > > > > than > > > > > > > > > > > > > > > > >> all > > > > > > > > > > > > > > > > >> > > > other kinds of events in the Kafka > > > > cluster. > > > > > I > > > > > > am > > > > > > > > not > > > > > > > > > > > sure > > > > > > > > > > > > > that > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > benefit > > > > > > > > > > > > > > > > >> > > > of doing this is worth the effort to > > add > > > > an > > > > > > > > optional > > > > > > > > > > > > string > > > > > > > > > > > > > > > field > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > > > > >> > > > LeaderAndIsrRequest. Also if we add > > this > > > > > > > optional > > > > > > > > > > field > > > > > > > > > > > in > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > > LeaderAndIsrRequest, we probably > want > > to > > > > > > remove > > > > > > > > > > > > > > > > >> ChangeReplicaDirRequest > > > > > > > > > > > > > > > > >> > > to > > > > > > > > > > > > > > > > >> > > > avoid having two requests doing the > > same > > > > > > thing. > > > > > > > > But > > > > > > > > > it > > > > > > > > > > > > means > > > > > > > > > > > > > > > user > > > > > > > > > > > > > > > > >> > script > > > > > > > > > > > > > > > > >> > > > can not send request directly to the > > > > broker > > > > > to > > > > > > > > > trigger > > > > > > > > > > > > > replica > > > > > > > > > > > > > > > > >> movement > > > > > > > > > > > > > > > > >> > > > between log directories. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > I will do it if you are strong about > > > this > > > > > > > > > optimzation. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > 3. /admin/reassign_partitions: > > > Including > > > > > the > > > > > > > log > > > > > > > > > dir > > > > > > > > > > > in > > > > > > > > > > > > > > every > > > > > > > > > > > > > > > > >> replica > > > > > > > > > > > > > > > > >> > > may > > > > > > > > > > > > > > > > >> > > > > not be efficient. We could > include a > > > > list > > > > > of > > > > > > > log > > > > > > > > > > > > > directories > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > >> > > > reference > > > > > > > > > > > > > > > > >> > > > > the index of the log directory in > > each > > > > > > > replica. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > Good point. I have updated the KIP > to > > > use > > > > > this > > > > > > > > > > solution. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > 4. DescribeDirsRequest: The stats > in > > > the > > > > > > > request > > > > > > > > > are > > > > > > > > > > > > > already > > > > > > > > > > > > > > > > >> > available > > > > > > > > > > > > > > > > >> > > > from > > > > > > > > > > > > > > > > >> > > > > JMX. Do we need the new request? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > Does JMX also include the state > (i.e. > > > > > offline > > > > > > or > > > > > > > > > > online) > > > > > > > > > > > > of > > > > > > > > > > > > > > each > > > > > > > > > > > > > > > > log > > > > > > > > > > > > > > > > >> > > > directory and the log directory of > > each > > > > > > replica? > > > > > > > > If > > > > > > > > > > not, > > > > > > > > > > > > > then > > > > > > > > > > > > > > > > maybe > > > > > > > > > > > > > > > > >> we > > > > > > > > > > > > > > > > >> > > > still need DescribeDirsRequest? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > 5. We want to be consistent on > > > > > > > > > > ChangeReplicaDirRequest > > > > > > > > > > > > vs > > > > > > > > > > > > > > > > >> > > > > ChangeReplicaRequest. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > I think ChangeReplicaRequest and > > > > > > > > > ChangeReplicaResponse > > > > > > > > > > > is > > > > > > > > > > > > my > > > > > > > > > > > > > > > typo. > > > > > > > > > > > > > > > > >> > Sorry, > > > > > > > > > > > > > > > > >> > > > they are fixed now. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > Thanks, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > Jun > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > On Fri, Feb 3, 2017 at 6:19 PM, > Dong > > > > Lin < > > > > > > > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > Hey ALexey, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > Thanks for all the comments! > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > I have updated the KIP to > specify > > > how > > > > we > > > > > > > > enforce > > > > > > > > > > > > quota. > > > > > > > > > > > > > I > > > > > > > > > > > > > > > also > > > > > > > > > > > > > > > > >> > > updated > > > > > > > > > > > > > > > > >> > > > > the > > > > > > > > > > > > > > > > >> > > > > > "The thread model and broker > logic > > > for > > > > > > > moving > > > > > > > > > > > replica > > > > > > > > > > > > > data > > > > > > > > > > > > > > > > >> between > > > > > > > > > > > > > > > > >> > > log > > > > > > > > > > > > > > > > >> > > > > > directories" to make it easier > to > > > > read. > > > > > > You > > > > > > > > can > > > > > > > > > > find > > > > > > > > > > > > the > > > > > > > > > > > > > > > exact > > > > > > > > > > > > > > > > >> > change > > > > > > > > > > > > > > > > >> > > > > here > > > > > > > > > > > > > > > > >> > > > > > <https://cwiki.apache.org/conf > > > > > > > > > > > > > > > luence/pages/diffpagesbyversio > > > > > > > > > > > > > > > > >> > > > > > n.action?pageId=67638408&selec > > > > > > > > > > > > > > > tedPageVersions=5&selectedPage > > > > > > > > > > > > > > > > >> > > > Versions=6>. > > > > > > > > > > > > > > > > >> > > > > > The idea is to use the same > > > > replication > > > > > > > quota > > > > > > > > > > > > mechanism > > > > > > > > > > > > > > > > >> introduced > > > > > > > > > > > > > > > > >> > in > > > > > > > > > > > > > > > > >> > > > > > KIP-73. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > Thanks, > > > > > > > > > > > > > > > > >> > > > > > Dong > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > On Wed, Feb 1, 2017 at 2:16 AM, > > > Alexey > > > > > > > > > Ozeritsky < > > > > > > > > > > > > > > > > >> > > aozerit...@yandex.ru > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > 24.01.2017, 22:03, "Dong Lin" > < > > > > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > >: > > > > > > > > > > > > > > > > >> > > > > > > > Hey Alexey, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks. I think we agreed > that > > > the > > > > > > > > suggested > > > > > > > > > > > > > solution > > > > > > > > > > > > > > > > >> doesn't > > > > > > > > > > > > > > > > >> > > work > > > > > > > > > > > > > > > > >> > > > in > > > > > > > > > > > > > > > > >> > > > > > > > general for kafka users. To > > > answer > > > > > > your > > > > > > > > > > > questions: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > 1. I agree we need quota to > > rate > > > > > limit > > > > > > > > > replica > > > > > > > > > > > > > > movement > > > > > > > > > > > > > > > > >> when a > > > > > > > > > > > > > > > > >> > > > broker > > > > > > > > > > > > > > > > >> > > > > > is > > > > > > > > > > > > > > > > >> > > > > > > > moving a "leader" replica. I > > > will > > > > > come > > > > > > > up > > > > > > > > > with > > > > > > > > > > > > > > solution, > > > > > > > > > > > > > > > > >> > probably > > > > > > > > > > > > > > > > >> > > > > > re-use > > > > > > > > > > > > > > > > >> > > > > > > > the config of replication > > quota > > > > > > > introduced > > > > > > > > > in > > > > > > > > > > > > > KIP-73. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > 2. Good point. I agree that > > this > > > > is > > > > > a > > > > > > > > > problem > > > > > > > > > > in > > > > > > > > > > > > > > > general. > > > > > > > > > > > > > > > > >> If is > > > > > > > > > > > > > > > > >> > > no > > > > > > > > > > > > > > > > >> > > > > new > > > > > > > > > > > > > > > > >> > > > > > > data > > > > > > > > > > > > > > > > >> > > > > > > > on that broker, with current > > > > default > > > > > > > value > > > > > > > > > of > > > > > > > > > > > > > > > > >> > > > > > replica.fetch.wait.max.ms > > > > > > > > > > > > > > > > >> > > > > > > > and replica.fetch.max.bytes, > > the > > > > > > replica > > > > > > > > > will > > > > > > > > > > be > > > > > > > > > > > > > moved > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > >> only > > > > > > > > > > > > > > > > >> > 2 > > > > > > > > > > > > > > > > >> > > > MBps > > > > > > > > > > > > > > > > >> > > > > > > > throughput. I think the > > solution > > > > is > > > > > > for > > > > > > > > > broker > > > > > > > > > > > to > > > > > > > > > > > > > set > > > > > > > > > > > > > > > > >> > > > > > > > replica.fetch.wait.max.ms > to > > 0 > > > in > > > > > its > > > > > > > > > > > > FetchRequest > > > > > > > > > > > > > if > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > > > > corresponding > > > > > > > > > > > > > > > > >> > > > > > > > ReplicaFetcherThread needs > to > > > move > > > > > > some > > > > > > > > > > replica > > > > > > > > > > > to > > > > > > > > > > > > > > > another > > > > > > > > > > > > > > > > >> > disk. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > 3. I have updated the KIP to > > > > mention > > > > > > > that > > > > > > > > > the > > > > > > > > > > > read > > > > > > > > > > > > > > size > > > > > > > > > > > > > > > > of a > > > > > > > > > > > > > > > > >> > > given > > > > > > > > > > > > > > > > >> > > > > > > > partition is configured > using > > > > > > > > > > > > > replica.fetch.max.bytes > > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > >> we > > > > > > > > > > > > > > > > >> > > move > > > > > > > > > > > > > > > > >> > > > > > > replicas > > > > > > > > > > > > > > > > >> > > > > > > > between disks. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Please see this > > > > > > > > > > > > > > > > >> > > > > > > > < > > https://cwiki.apache.org/conf > > > > > > > > > > > > > > > > >> luence/pages/diffpagesbyversio > > > > > > > > > > > > > > > > >> > > > n.action > > > > > > > > > > > > > > > > >> > > > > ? > > > > > > > > > > > > > > > > >> > > > > > > pageId=67638408&selectedPageVe > > > > > > > > > > > > > > > > rsions=4&selectedPageVersions= > > > > > > > > > > > > > > > > >> 5> > > > > > > > > > > > > > > > > >> > > > > > > > for the change of the KIP. I > > > will > > > > > come > > > > > > > up > > > > > > > > > > with a > > > > > > > > > > > > > > > solution > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> > > > throttle > > > > > > > > > > > > > > > > >> > > > > > > > replica movement when a > broker > > > is > > > > > > > moving a > > > > > > > > > > > > "leader" > > > > > > > > > > > > > > > > replica. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > Thanks. It looks great. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > On Tue, Jan 24, 2017 at 3:30 > > AM, > > > > > > Alexey > > > > > > > > > > > Ozeritsky > > > > > > > > > > > > < > > > > > > > > > > > > > > > > >> > > > > > aozerit...@yandex.ru> > > > > > > > > > > > > > > > > >> > > > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > >> 23.01.2017, 22:11, "Dong > > Lin" > > > < > > > > > > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > >: > > > > > > > > > > > > > > > > >> > > > > > > >> > Thanks. Please see my > > > comment > > > > > > > inline. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > On Mon, Jan 23, 2017 at > > 6:45 > > > > AM, > > > > > > > > Alexey > > > > > > > > > > > > > Ozeritsky > > > > > > > > > > > > > > < > > > > > > > > > > > > > > > > >> > > > > > > aozerit...@yandex.ru> > > > > > > > > > > > > > > > > >> > > > > > > >> > wrote: > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> 13.01.2017, 22:29, > "Dong > > > > Lin" < > > > > > > > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > >: > > > > > > > > > > > > > > > > >> > > > > > > >> >> > Hey Alexey, > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> > Thanks for your > review > > > and > > > > > the > > > > > > > > > > > alternative > > > > > > > > > > > > > > > > approach. > > > > > > > > > > > > > > > > >> > Here > > > > > > > > > > > > > > > > >> > > is > > > > > > > > > > > > > > > > >> > > > > my > > > > > > > > > > > > > > > > >> > > > > > > >> >> > understanding of your > > > > patch. > > > > > > > > kafka's > > > > > > > > > > > > > background > > > > > > > > > > > > > > > > >> threads > > > > > > > > > > > > > > > > >> > > are > > > > > > > > > > > > > > > > >> > > > > used > > > > > > > > > > > > > > > > >> > > > > > > to > > > > > > > > > > > > > > > > >> > > > > > > >> move > > > > > > > > > > > > > > > > >> > > > > > > >> >> > data between > replicas. > > > When > > > > > > data > > > > > > > > > > movement > > > > > > > > > > > > is > > > > > > > > > > > > > > > > >> triggered, > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > > log > > > > > > > > > > > > > > > > >> > > > > > > will > > > > > > > > > > > > > > > > >> > > > > > > >> be > > > > > > > > > > > > > > > > >> > > > > > > >> >> > rolled and the new > logs > > > > will > > > > > be > > > > > > > put > > > > > > > > > in > > > > > > > > > > > the > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > >> > directory, > > > > > > > > > > > > > > > > >> > > > and > > > > > > > > > > > > > > > > >> > > > > > > >> background > > > > > > > > > > > > > > > > >> > > > > > > >> >> > threads will move > > segment > > > > > from > > > > > > > old > > > > > > > > > > > > directory > > > > > > > > > > > > > to > > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > >> > > > directory. > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> > It is important to > note > > > > that > > > > > > > > KIP-112 > > > > > > > > > is > > > > > > > > > > > > > > intended > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> work > > > > > > > > > > > > > > > > >> > > > with > > > > > > > > > > > > > > > > >> > > > > > > >> KIP-113 to > > > > > > > > > > > > > > > > >> > > > > > > >> >> > support JBOD. I think > > > your > > > > > > > solution > > > > > > > > > is > > > > > > > > > > > > > > definitely > > > > > > > > > > > > > > > > >> > simpler > > > > > > > > > > > > > > > > >> > > > and > > > > > > > > > > > > > > > > >> > > > > > > better > > > > > > > > > > > > > > > > >> > > > > > > >> >> under > > > > > > > > > > > > > > > > >> > > > > > > >> >> > the current kafka > > > > > > implementation > > > > > > > > > that a > > > > > > > > > > > > > broker > > > > > > > > > > > > > > > will > > > > > > > > > > > > > > > > >> fail > > > > > > > > > > > > > > > > >> > > if > > > > > > > > > > > > > > > > >> > > > > any > > > > > > > > > > > > > > > > >> > > > > > > disk > > > > > > > > > > > > > > > > >> > > > > > > >> >> fails. > > > > > > > > > > > > > > > > >> > > > > > > >> >> > But I am not sure if > we > > > > want > > > > > to > > > > > > > > allow > > > > > > > > > > > > broker > > > > > > > > > > > > > to > > > > > > > > > > > > > > > run > > > > > > > > > > > > > > > > >> with > > > > > > > > > > > > > > > > >> > > > > partial > > > > > > > > > > > > > > > > >> > > > > > > >> disks > > > > > > > > > > > > > > > > >> > > > > > > >> >> > failure. Let's say > the > > a > > > > > > replica > > > > > > > is > > > > > > > > > > being > > > > > > > > > > > > > moved > > > > > > > > > > > > > > > > from > > > > > > > > > > > > > > > > >> > > > > log_dir_old > > > > > > > > > > > > > > > > >> > > > > > > to > > > > > > > > > > > > > > > > >> > > > > > > >> >> > log_dir_new and then > > > > > > log_dir_old > > > > > > > > > stops > > > > > > > > > > > > > working > > > > > > > > > > > > > > > due > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> > disk > > > > > > > > > > > > > > > > >> > > > > > > failure. > > > > > > > > > > > > > > > > >> > > > > > > >> How > > > > > > > > > > > > > > > > >> > > > > > > >> >> > would your existing > > patch > > > > > > handles > > > > > > > > it? > > > > > > > > > > To > > > > > > > > > > > > make > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > scenario a > > > > > > > > > > > > > > > > >> > > > > bit > > > > > > > > > > > > > > > > >> > > > > > > more > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> > > > > > > >> >> We will lose > log_dir_old. > > > > After > > > > > > > > broker > > > > > > > > > > > > restart > > > > > > > > > > > > > we > > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > >> read > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > > > data > > > > > > > > > > > > > > > > >> > > > > > > >> from > > > > > > > > > > > > > > > > >> > > > > > > >> >> log_dir_new. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > No, you probably can't. > > This > > > > is > > > > > > > > because > > > > > > > > > > the > > > > > > > > > > > > > broker > > > > > > > > > > > > > > > > >> doesn't > > > > > > > > > > > > > > > > >> > > have > > > > > > > > > > > > > > > > >> > > > > > > *all* the > > > > > > > > > > > > > > > > >> > > > > > > >> > data for this partition. > > For > > > > > > > example, > > > > > > > > > say > > > > > > > > > > > the > > > > > > > > > > > > > > broker > > > > > > > > > > > > > > > > has > > > > > > > > > > > > > > > > >> > > > > > > >> > partition_segement_1, > > > > > > > > > partition_segment_50 > > > > > > > > > > > and > > > > > > > > > > > > > > > > >> > > > > > partition_segment_100 > > > > > > > > > > > > > > > > >> > > > > > > on > > > > > > > > > > > > > > > > >> > > > > > > >> the > > > > > > > > > > > > > > > > >> > > > > > > >> > log_dir_old. > > > > > > partition_segment_100, > > > > > > > > > which > > > > > > > > > > > has > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > latest > > > > > > > > > > > > > > > > >> > > data, > > > > > > > > > > > > > > > > >> > > > > has > > > > > > > > > > > > > > > > >> > > > > > > been > > > > > > > > > > > > > > > > >> > > > > > > >> > moved to log_dir_new, > and > > > the > > > > > > > > > log_dir_old > > > > > > > > > > > > fails > > > > > > > > > > > > > > > before > > > > > > > > > > > > > > > > >> > > > > > > >> partition_segment_50 > > > > > > > > > > > > > > > > >> > > > > > > >> > and partition_segment_1 > is > > > > moved > > > > > > to > > > > > > > > > > > > log_dir_new. > > > > > > > > > > > > > > > When > > > > > > > > > > > > > > > > >> > broker > > > > > > > > > > > > > > > > >> > > > > > > re-starts, > > > > > > > > > > > > > > > > >> > > > > > > >> it > > > > > > > > > > > > > > > > >> > > > > > > >> > won't have > > > > partition_segment_50. > > > > > > > This > > > > > > > > > > causes > > > > > > > > > > > > > > problem > > > > > > > > > > > > > > > > if > > > > > > > > > > > > > > > > >> > > broker > > > > > > > > > > > > > > > > >> > > > is > > > > > > > > > > > > > > > > >> > > > > > > elected > > > > > > > > > > > > > > > > >> > > > > > > >> > leader and consumer > wants > > to > > > > > > consume > > > > > > > > > data > > > > > > > > > > in > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > > > > partition_segment_1. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> Right. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> > complicated, let's > say > > > the > > > > > > broker > > > > > > > > is > > > > > > > > > > > > > shtudown, > > > > > > > > > > > > > > > > >> > > log_dir_old's > > > > > > > > > > > > > > > > >> > > > > > disk > > > > > > > > > > > > > > > > >> > > > > > > >> fails, > > > > > > > > > > > > > > > > >> > > > > > > >> >> > and the broker > starts. > > In > > > > > this > > > > > > > case > > > > > > > > > > > broker > > > > > > > > > > > > > > > doesn't > > > > > > > > > > > > > > > > >> even > > > > > > > > > > > > > > > > >> > > know > > > > > > > > > > > > > > > > >> > > > > if > > > > > > > > > > > > > > > > >> > > > > > > >> >> log_dir_new > > > > > > > > > > > > > > > > >> > > > > > > >> >> > has all the data > needed > > > for > > > > > > this > > > > > > > > > > replica. > > > > > > > > > > > > It > > > > > > > > > > > > > > > > becomes > > > > > > > > > > > > > > > > >> a > > > > > > > > > > > > > > > > >> > > > problem > > > > > > > > > > > > > > > > >> > > > > > if > > > > > > > > > > > > > > > > >> > > > > > > the > > > > > > > > > > > > > > > > >> > > > > > > >> >> > broker is elected > > leader > > > of > > > > > > this > > > > > > > > > > > partition > > > > > > > > > > > > in > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > >> case. > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> > > > > > > >> >> log_dir_new contains > the > > > most > > > > > > > recent > > > > > > > > > data > > > > > > > > > > > so > > > > > > > > > > > > we > > > > > > > > > > > > > > > will > > > > > > > > > > > > > > > > >> lose > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > > tail > > > > > > > > > > > > > > > > >> > > > > > > of > > > > > > > > > > > > > > > > >> > > > > > > >> >> partition. > > > > > > > > > > > > > > > > >> > > > > > > >> >> This is not a big > problem > > > for > > > > > us > > > > > > > > > because > > > > > > > > > > we > > > > > > > > > > > > > > already > > > > > > > > > > > > > > > > >> delete > > > > > > > > > > > > > > > > >> > > > tails > > > > > > > > > > > > > > > > >> > > > > > by > > > > > > > > > > > > > > > > >> > > > > > > >> hand > > > > > > > > > > > > > > > > >> > > > > > > >> >> (see > > > > > > > https://issues.apache.org/jira > > > > > > > > > > > > > > > > /browse/KAFKA-1712 > > > > > > > > > > > > > > > > >> ). > > > > > > > > > > > > > > > > >> > > > > > > >> >> Also we dont use > > authomatic > > > > > > leader > > > > > > > > > > > balancing > > > > > > > > > > > > > > > > >> > > > > > > >> > > (auto.leader.rebalance.enable= > > > > > > false), > > > > > > > > > > > > > > > > >> > > > > > > >> >> so this partition > becomes > > > the > > > > > > > leader > > > > > > > > > > with a > > > > > > > > > > > > low > > > > > > > > > > > > > > > > >> > probability. > > > > > > > > > > > > > > > > >> > > > > > > >> >> I think my patch can be > > > > > modified > > > > > > to > > > > > > > > > > > prohibit > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> selection > > > > > > > > > > > > > > > > >> > > of > > > > > > > > > > > > > > > > >> > > > > the > > > > > > > > > > > > > > > > >> > > > > > > >> leader > > > > > > > > > > > > > > > > >> > > > > > > >> >> until the partition > does > > > not > > > > > move > > > > > > > > > > > completely. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > I guess you are saying > > that > > > > you > > > > > > have > > > > > > > > > > deleted > > > > > > > > > > > > the > > > > > > > > > > > > > > > tails > > > > > > > > > > > > > > > > >> by > > > > > > > > > > > > > > > > >> > > hand > > > > > > > > > > > > > > > > >> > > > in > > > > > > > > > > > > > > > > >> > > > > > > your > > > > > > > > > > > > > > > > >> > > > > > > >> own > > > > > > > > > > > > > > > > >> > > > > > > >> > kafka branch. But > > KAFKA-1712 > > > > is > > > > > > not > > > > > > > > > > accepted > > > > > > > > > > > > > into > > > > > > > > > > > > > > > > Kafka > > > > > > > > > > > > > > > > >> > trunk > > > > > > > > > > > > > > > > >> > > > > and I > > > > > > > > > > > > > > > > >> > > > > > > am > > > > > > > > > > > > > > > > >> > > > > > > >> not > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> No. We just modify > segments > > > > mtime > > > > > by > > > > > > > > cron > > > > > > > > > > job. > > > > > > > > > > > > > This > > > > > > > > > > > > > > > > works > > > > > > > > > > > > > > > > >> > with > > > > > > > > > > > > > > > > >> > > > > > vanilla > > > > > > > > > > > > > > > > >> > > > > > > >> kafka. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > sure if it is the right > > > > > solution. > > > > > > > How > > > > > > > > > > would > > > > > > > > > > > > this > > > > > > > > > > > > > > > > >> solution > > > > > > > > > > > > > > > > >> > > > address > > > > > > > > > > > > > > > > >> > > > > > the > > > > > > > > > > > > > > > > >> > > > > > > >> > problem mentioned above? > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> If you need only fresh > data > > > and > > > > if > > > > > > you > > > > > > > > > > remove > > > > > > > > > > > > old > > > > > > > > > > > > > > data > > > > > > > > > > > > > > > > by > > > > > > > > > > > > > > > > >> > hands > > > > > > > > > > > > > > > > >> > > > > this > > > > > > > > > > > > > > > > >> > > > > > is > > > > > > > > > > > > > > > > >> > > > > > > >> not a problem. But in > > general > > > > case > > > > > > > > > > > > > > > > >> > > > > > > >> this is a problem of > course. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > BTW, I am not sure the > > > > solution > > > > > > > > > mentioned > > > > > > > > > > in > > > > > > > > > > > > > > > > KAFKA-1712 > > > > > > > > > > > > > > > > >> is > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > > > right > > > > > > > > > > > > > > > > >> > > > > > > way > > > > > > > > > > > > > > > > >> > > > > > > >> to > > > > > > > > > > > > > > > > >> > > > > > > >> > address its problem. Now > > > that > > > > we > > > > > > > have > > > > > > > > > > > > timestamp > > > > > > > > > > > > > in > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > message > > > > > > > > > > > > > > > > >> > > > we > > > > > > > > > > > > > > > > >> > > > > > > can use > > > > > > > > > > > > > > > > >> > > > > > > >> > that to delete old > > segement > > > > > > instead > > > > > > > of > > > > > > > > > > > relying > > > > > > > > > > > > > on > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> log > > > > > > > > > > > > > > > > >> > > > segment > > > > > > > > > > > > > > > > >> > > > > > > mtime. > > > > > > > > > > > > > > > > >> > > > > > > >> > Just some idea and we > > don't > > > > have > > > > > > to > > > > > > > > > > discuss > > > > > > > > > > > > this > > > > > > > > > > > > > > > > problem > > > > > > > > > > > > > > > > >> > > here. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> > The solution > presented > > in > > > > the > > > > > > KIP > > > > > > > > > > > attempts > > > > > > > > > > > > to > > > > > > > > > > > > > > > > handle > > > > > > > > > > > > > > > > >> it > > > > > > > > > > > > > > > > >> > by > > > > > > > > > > > > > > > > >> > > > > > > replacing > > > > > > > > > > > > > > > > >> > > > > > > >> >> > replica in an atomic > > > > version > > > > > > > > fashion > > > > > > > > > > > after > > > > > > > > > > > > > the > > > > > > > > > > > > > > > log > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > > > > >> > > > new > > > > > > > > > > > > > > > > >> > > > > > dir > > > > > > > > > > > > > > > > >> > > > > > > has > > > > > > > > > > > > > > > > >> > > > > > > >> >> fully > > > > > > > > > > > > > > > > >> > > > > > > >> >> > caught up with the > log > > in > > > > the > > > > > > old > > > > > > > > > dir. > > > > > > > > > > At > > > > > > > > > > > > at > > > > > > > > > > > > > > time > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > log > > > > > > > > > > > > > > > > >> > > > can > > > > > > > > > > > > > > > > >> > > > > be > > > > > > > > > > > > > > > > >> > > > > > > >> >> considered > > > > > > > > > > > > > > > > >> > > > > > > >> >> > to exist on only one > > log > > > > > > > directory. > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> > > > > > > >> >> As I understand your > > > solution > > > > > > does > > > > > > > > not > > > > > > > > > > > cover > > > > > > > > > > > > > > > quotas. > > > > > > > > > > > > > > > > >> > > > > > > >> >> What happens if someone > > > > starts > > > > > to > > > > > > > > > > transfer > > > > > > > > > > > > 100 > > > > > > > > > > > > > > > > >> partitions > > > > > > > > > > > > > > > > >> > ? > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > Good point. Quota can be > > > > > > implemented > > > > > > > > in > > > > > > > > > > the > > > > > > > > > > > > > > future. > > > > > > > > > > > > > > > It > > > > > > > > > > > > > > > > >> is > > > > > > > > > > > > > > > > >> > > > > currently > > > > > > > > > > > > > > > > >> > > > > > > >> > mentioned as as a > > potential > > > > > future > > > > > > > > > > > improvement > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> KIP-112 > > > > > > > > > > > > > > > > >> > > > > > > >> > < > > > > https://cwiki.apache.org/conf > > > > > > > > > > > > > > > > luence/display/KAFKA/KIP- > > > > > > > > > > > > > > > > >> > 112%3 > > > > > > > > > > > > > > > > >> > > > > > > >> A+Handle+disk+failure+for+ > > > > > > > JBOD>.Thanks > > > > > > > > > > > > > > > > >> > > > > > > >> > for the reminder. I will > > > move > > > > it > > > > > > to > > > > > > > > > > KIP-113. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> > If yes, it will read > a > > > > > > > > > > > ByteBufferMessageSet > > > > > > > > > > > > > > from > > > > > > > > > > > > > > > > >> > > > > > > topicPartition.log > > > > > > > > > > > > > > > > >> > > > > > > >> and > > > > > > > > > > > > > > > > >> > > > > > > >> >> append the message set > to > > > > > > > > > > > topicPartition.move > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> > > > > > > >> >> i.e. > processPartitionData > > > > will > > > > > > read > > > > > > > > > data > > > > > > > > > > > from > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > beginning > > > > > > > > > > > > > > > > >> > > of > > > > > > > > > > > > > > > > >> > > > > > > >> >> topicPartition.log? > What > > is > > > > the > > > > > > > read > > > > > > > > > > size? > > > > > > > > > > > > > > > > >> > > > > > > >> >> A ReplicaFetchThread > > reads > > > > many > > > > > > > > > > partitions > > > > > > > > > > > so > > > > > > > > > > > > > if > > > > > > > > > > > > > > > one > > > > > > > > > > > > > > > > >> does > > > > > > > > > > > > > > > > >> > > some > > > > > > > > > > > > > > > > >> > > > > > > >> complicated > > > > > > > > > > > > > > > > >> > > > > > > >> >> work (= read a lot of > > data > > > > from > > > > > > > disk) > > > > > > > > > > > > > everything > > > > > > > > > > > > > > > will > > > > > > > > > > > > > > > > >> slow > > > > > > > > > > > > > > > > >> > > > down. > > > > > > > > > > > > > > > > >> > > > > > > >> >> I think read size > should > > > not > > > > be > > > > > > > very > > > > > > > > > big. > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > > > >> > > > > > > >> >> On the other hand at > this > > > > point > > > > > > > > > > > > > > > > (processPartitionData) > > > > > > > > > > > > > > > > >> one > > > > > > > > > > > > > > > > >> > > can > > > > > > > > > > > > > > > > >> > > > > use > > > > > > > > > > > > > > > > >> > > > > > > only > > > > > > > > > > > > > > > > >> > > > > > > >> >> the new data > > > > > > (ByteBufferMessageSet > > > > > > > > from > > > > > > > > > > > > > > parameters) > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > >> > wait > > > > > > > > > > > > > > > > >> > > > > until > > > > > > > > > > > > > > > > >> > > > > > > >> >> (topicPartition.move. > > > > > > > smallestOffset > > > > > > > > <= > > > > > > > > > > > > > > > > >> > > > > > > topicPartition.log.smallestOff > > > > > > > > > > > > > > > > >> > > > > > > >> set > > > > > > > > > > > > > > > > >> > > > > > > >> >> && topicPartition.log. > > > > > > > largestOffset > > > > > > > > == > > > > > > > > > > > > > > > > >> > > > > > > topicPartition.log.largestOffs > > > > > > > > > > > > > > > > >> > > > > > > >> et). > > > > > > > > > > > > > > > > >> > > > > > > >> >> In this case the write > > > speed > > > > to > > > > > > > > > > > > > > topicPartition.move > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > >> > > > > > > >> topicPartition.log > > > > > > > > > > > > > > > > >> > > > > > > >> >> will be the same so > this > > > will > > > > > > allow > > > > > > > > us > > > > > > > > > to > > > > > > > > > > > > move > > > > > > > > > > > > > > many > > > > > > > > > > > > > > > > >> > > partitions > > > > > > > > > > > > > > > > >> > > > > to > > > > > > > > > > > > > > > > >> > > > > > > one > > > > > > > > > > > > > > > > >> > > > > > > >> disk. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > The read size of a given > > > > > partition > > > > > > > is > > > > > > > > > > > > configured > > > > > > > > > > > > > > > > >> > > > > > > >> > using > > > replica.fetch.max.bytes, > > > > > > which > > > > > > > > is > > > > > > > > > > the > > > > > > > > > > > > same > > > > > > > > > > > > > > > size > > > > > > > > > > > > > > > > >> used > > > > > > > > > > > > > > > > >> > by > > > > > > > > > > > > > > > > >> > > > > > > >> FetchRequest > > > > > > > > > > > > > > > > >> > > > > > > >> > from follower to leader. > > If > > > > the > > > > > > > broker > > > > > > > > > is > > > > > > > > > > > > > moving a > > > > > > > > > > > > > > > > >> replica > > > > > > > > > > > > > > > > >> > > for > > > > > > > > > > > > > > > > >> > > > > > which > > > > > > > > > > > > > > > > >> > > > > > > it > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> OK. Could you mention it > in > > > KIP? > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > acts as a follower, the > > disk > > > > > write > > > > > > > > rate > > > > > > > > > > for > > > > > > > > > > > > > moving > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > >> > > replica > > > > > > > > > > > > > > > > >> > > > > is > > > > > > > > > > > > > > > > >> > > > > > at > > > > > > > > > > > > > > > > >> > > > > > > >> most > > > > > > > > > > > > > > > > >> > > > > > > >> > the rate it fetches from > > > > leader > > > > > > > > (assume > > > > > > > > > it > > > > > > > > > > > is > > > > > > > > > > > > > > > catching > > > > > > > > > > > > > > > > >> up > > > > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > > > > >> > > > has > > > > > > > > > > > > > > > > >> > > > > > > >> > sufficient data to read > > from > > > > > > leader, > > > > > > > > > which > > > > > > > > > > > is > > > > > > > > > > > > > > > subject > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> > > > > > > round-trip-time > > > > > > > > > > > > > > > > >> > > > > > > >> > between itself and the > > > leader. > > > > > > Thus > > > > > > > > this > > > > > > > > > > > part > > > > > > > > > > > > if > > > > > > > > > > > > > > > > >> probably > > > > > > > > > > > > > > > > >> > > fine > > > > > > > > > > > > > > > > >> > > > > even > > > > > > > > > > > > > > > > >> > > > > > > >> without > > > > > > > > > > > > > > > > >> > > > > > > >> > quota. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> I think there are 2 > problems > > > > > > > > > > > > > > > > >> > > > > > > >> 1. Without speed limiter > > this > > > > will > > > > > > not > > > > > > > > > work > > > > > > > > > > > good > > > > > > > > > > > > > > even > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > >> 1 > > > > > > > > > > > > > > > > >> > > > > > partition. > > > > > > > > > > > > > > > > >> > > > > > > In > > > > > > > > > > > > > > > > >> > > > > > > >> our production we had a > > > problem > > > > so > > > > > > we > > > > > > > > did > > > > > > > > > > the > > > > > > > > > > > > > > throuput > > > > > > > > > > > > > > > > >> > limiter: > > > > > > > > > > > > > > > > >> > > > > > > >> > > > https://github.com/resetius/ka > > > > > > > > > > > > > > > > >> fka/commit/cda31dadb2f135743bf > > > > > > > > > > > > > > > > >> > > > > > > >> > > 41083062927886c5ddce1#diff-ffa > > > > > > > > > > > > > > > > >> 8861e850121997a534ebdde2929c6R > > > > > > > > > > > > > > > > >> > > 713 > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> 2. I dont understand how > it > > > will > > > > > > work > > > > > > > in > > > > > > > > > > case > > > > > > > > > > > of > > > > > > > > > > > > > big > > > > > > > > > > > > > > > > >> > > > > > > >> replica.fetch.wait.max.ms > > and > > > > > > > partition > > > > > > > > > > with > > > > > > > > > > > > > > > irregular > > > > > > > > > > > > > > > > >> flow. > > > > > > > > > > > > > > > > >> > > > > > > >> For example someone could > > have > > > > > > > > > > > > > > > > replica.fetch.wait.max.ms > > > > > > > > > > > > > > > > >> > =10mi > > > > > > > > > > > > > > > > >> > > > nutes > > > > > > > > > > > > > > > > >> > > > > > and > > > > > > > > > > > > > > > > >> > > > > > > >> partition that has very > high > > > > data > > > > > > flow > > > > > > > > > from > > > > > > > > > > > > 12:00 > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > 13:00 > > > > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > > > > >> > > > zero > > > > > > > > > > > > > > > > >> > > > > > > flow > > > > > > > > > > > > > > > > >> > > > > > > >> otherwise. > > > > > > > > > > > > > > > > >> > > > > > > >> In this case > > > > processPartitionData > > > > > > > could > > > > > > > > be > > > > > > > > > > > > called > > > > > > > > > > > > > > once > > > > > > > > > > > > > > > > per > > > > > > > > > > > > > > > > >> > > > > 10minutes > > > > > > > > > > > > > > > > >> > > > > > > so if > > > > > > > > > > > > > > > > >> > > > > > > >> we start data moving in > > 13:01 > > > it > > > > > > will > > > > > > > be > > > > > > > > > > > > finished > > > > > > > > > > > > > > next > > > > > > > > > > > > > > > > >> day. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > But ff the broker is > > moving > > > a > > > > > > > replica > > > > > > > > > for > > > > > > > > > > > > which > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > acts > > > > > > > > > > > > > > > > >> as > > > > > > > > > > > > > > > > >> > a > > > > > > > > > > > > > > > > >> > > > > > leader, > > > > > > > > > > > > > > > > >> > > > > > > as > > > > > > > > > > > > > > > > >> > > > > > > >> of > > > > > > > > > > > > > > > > >> > > > > > > >> > current KIP the broker > > will > > > > keep > > > > > > > > reading > > > > > > > > > > > from > > > > > > > > > > > > > > > > >> log_dir_old > > > > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > > > > >> > > > > > append > > > > > > > > > > > > > > > > >> > > > > > > to > > > > > > > > > > > > > > > > >> > > > > > > >> > log_dir_new without > having > > > to > > > > > wait > > > > > > > for > > > > > > > > > > > > > > > > round-trip-time. > > > > > > > > > > > > > > > > >> We > > > > > > > > > > > > > > > > >> > > > > probably > > > > > > > > > > > > > > > > >> > > > > > > need > > > > > > > > > > > > > > > > >> > > > > > > >> > quota for this in the > > > future. > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> > And to answer your > > > > question, > > > > > > yes > > > > > > > > > > > > > > > topicpartition.log > > > > > > > > > > > > > > > > >> > refers > > > > > > > > > > > > > > > > >> > > > to > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > topic-paritition/segment.log. > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> > Thanks, > > > > > > > > > > > > > > > > >> > > > > > > >> >> > Dong > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> > On Fri, Jan 13, 2017 > at > > > > 4:12 > > > > > > AM, > > > > > > > > > Alexey > > > > > > > > > > > > > > > Ozeritsky < > > > > > > > > > > > > > > > > >> > > > > > > >> aozerit...@yandex.ru> > > > > > > > > > > > > > > > > >> > > > > > > >> >> > wrote: > > > > > > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> Hi, > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> We have the similar > > > > solution > > > > > > > that > > > > > > > > > have > > > > > > > > > > > > been > > > > > > > > > > > > > > > > working > > > > > > > > > > > > > > > > >> in > > > > > > > > > > > > > > > > >> > > > > > production > > > > > > > > > > > > > > > > >> > > > > > > >> since > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> 2014. You can see it > > > here: > > > > > > > > > > > > > > > > >> > > https://github.com/resetius/ka > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > fka/commit/20658593e246d218490 > > > > > > > > > > > > > > > > 6879defa2e763c4d413fb > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> The idea is very > > simple > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> 1. Disk balancer > runs > > > in a > > > > > > > > separate > > > > > > > > > > > thread > > > > > > > > > > > > > > > inside > > > > > > > > > > > > > > > > >> > > scheduler > > > > > > > > > > > > > > > > >> > > > > > pool. > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> 2. It does not touch > > > empty > > > > > > > > > partitions > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> 3. Before it moves a > > > > > partition > > > > > > > it > > > > > > > > > > > forcibly > > > > > > > > > > > > > > > creates > > > > > > > > > > > > > > > > >> new > > > > > > > > > > > > > > > > >> > > > > segment > > > > > > > > > > > > > > > > >> > > > > > > on a > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> destination disk > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> 4. It moves segment > by > > > > > segment > > > > > > > > from > > > > > > > > > > new > > > > > > > > > > > to > > > > > > > > > > > > > > old. > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> 5. Log class works > > with > > > > > > segments > > > > > > > > on > > > > > > > > > > both > > > > > > > > > > > > > disks > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> Your approach seems > > too > > > > > > > > complicated, > > > > > > > > > > > > > moreover > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > >> means > > > > > > > > > > > > > > > > >> > > that > > > > > > > > > > > > > > > > >> > > > > you > > > > > > > > > > > > > > > > >> > > > > > > >> have to > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> patch different > > > components > > > > > of > > > > > > > the > > > > > > > > > > system > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> Could you clarify > what > > > do > > > > > you > > > > > > > mean > > > > > > > > > by > > > > > > > > > > > > > > > > >> > topicPartition.log? > > > > > > > > > > > > > > > > >> > > > Is > > > > > > > > > > > > > > > > >> > > > > it > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > topic-paritition/segment.log ? > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> 12.01.2017, 21:47, > > "Dong > > > > > Lin" > > > > > > < > > > > > > > > > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > >> >: > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > Hi all, > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > We created > KIP-113: > > > > > Support > > > > > > > > > replicas > > > > > > > > > > > > > > movement > > > > > > > > > > > > > > > > >> between > > > > > > > > > > > > > > > > >> > > log > > > > > > > > > > > > > > > > >> > > > > > > >> >> directories. > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > Please find the > KIP > > > wiki > > > > > in > > > > > > > the > > > > > > > > > link > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > * > > > > > > > https://cwiki.apache.org/conf > > > > > > > > > > > > > > > > >> > > > > luence/display/KAFKA/KIP-113% > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > 3A+Support+replicas+movement+b > > > > > > > > > > > > > > > > >> etween+log+directories > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > < > > > > > > > https://cwiki.apache.org/conf > > > > > > > > > > > > > > > > >> > > > > luence/display/KAFKA/KIP-113% > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > 3A+Support+replicas+movement+ > > > > > > > > > > > > > > > > >> > between+log+directories>.* > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > This KIP is > related > > to > > > > > > KIP-112 > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > < > > > > > > > https://cwiki.apache.org/conf > > > > > > > > > > > > > > > > >> > > > > luence/display/KAFKA/KIP-112% > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > 3A+Handle+disk+failure+for+ > > > > > > > JBOD>: > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > Handle disk > failure > > > for > > > > > > JBOD. > > > > > > > > They > > > > > > > > > > are > > > > > > > > > > > > > > needed > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> > order > > > > > > > > > > > > > > > > >> > > to > > > > > > > > > > > > > > > > >> > > > > > > support > > > > > > > > > > > > > > > > >> > > > > > > >> >> JBOD in > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > Kafka. Please help > > > > review > > > > > > the > > > > > > > > KIP. > > > > > > > > > > You > > > > > > > > > > > > > > > feedback > > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > >> > > > > > appreciated! > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > Thanks, > > > > > > > > > > > > > > > > >> > > > > > > >> >> >> > Dong > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >