Hi, Dong,

Thanks for the update. A few replies inlined below.

On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Thanks for your comment! Please see my reply below.
>
> On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Dong,
> >
> > Thanks for the reply.
> >
> > 10. Could you comment on that?
> >
>
> Sorry, I missed that comment.
>
> Good point. I think the log segments in topicPartition.move directory will
> be subject to log truncation, log retention and log cleaning in the same
> way as the log segments in the source log directory. I just specified this
> inthe KIP.
>
>
This is ok, but doubles the overhead of log cleaning. We probably want to
think a bit more on this.


>
> >
> > 11.2 "I am concerned that the ChangeReplicaDirRequest would be lost if
> > broker
> > restarts after it sends ChangeReplicaDirResponse but before it receives
> > LeaderAndIsrRequest."
> >
> > In that case, the reassignment tool could detect that through
> > DescribeDirsRequest
> > and issue ChangeReplicaDirRequest again, right? In the common case, this
> is
> > probably not needed and we only need to write each replica once.
> >
> > My main concern with the approach in the current KIP is that once a new
> > replica is created in the wrong log dir, the cross log directory movement
> > may not catch up until the new replica is fully bootstrapped. So, we end
> up
> > writing the data for the same replica twice.
> >
>
> I agree with your concern. My main concern is that it is a bit weird if
> ChangeReplicaDirResponse can not guarantee success and the tool needs to
> rely on DescribeDirResponse to see if it needs to send
> ChangeReplicaDirRequest again.
>
> How about this: If broker doesn't not have already replica created for the
> specified topicParition when it receives ChangeReplicaDirRequest, it will
> reply ReplicaNotAvailableException AND remember (replica, destination log
> directory) pair in memory to create the replica in the specified log
> directory.
>
>
I am not sure if returning ReplicaNotAvailableException is useful? What
will the client do on receiving ReplicaNotAvailableException in this case?

Perhaps we could just replace the is_temporary field in
DescribeDirsRresponsePartition with a state field. We can use 0 to indicate
the partition is created, 1 to indicate the partition is temporary and 2 to
indicate that the partition is pending.


> >
> > 11.3 Are you saying the value in --throttle will be used to set both
> > intra.broker.throttled.rate and leader.follower.replication.
> > throttled.replicas?
> >
>
> No. --throttle will be used to only to set leader.follower.replication as
> it does now. I think we do not need any option in the
> kafka-reassignment-partitions.sh to specify intra.broker.throttled.rate.
> User canset it in broker config or dynamically using kafka-config.sh. Does
> this sound OK?
>
>
Ok. This sounds good. It would be useful to make this clear in the wiki.


>
> >
> > 12.2 If the user only wants to check one topic, the tool could do the
> > filtering on the client side, right? My concern with having both log_dirs
> > and topics is the semantic. For example, if both are not empty, do we
> > return the intersection or the union?
> >
>
> Yes the tool could filter on the client side. But the purpose of having
> this field is to reduce response side in case broker has a lot of topics.
> The both fields are used as filter and the result is intersection. Do you
> think this semantic is confusing or counter-intuitive?


>

Ok. Could we document the semantic when both dirs and topics are specified?

Thanks,

Jun

>
> >
> > On Mon, Mar 13, 2017 at 3:32 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks much for your detailed comments. Please see my reply below.
> > >
> > > On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Dong,
> > > >
> > > > Thanks for the updated KIP. Some more comments below.
> > > >
> > > > 10. For the .move log, do we perform any segment deletion (based on
> > > > retention) or log cleaning (if a compacted topic)? Or do we only
> enable
> > > > that after the swap?
> > > >
> > > > 11. kafka-reassign-partitions.sh
> > > > 11.1 If all reassigned replicas are in the current broker and only
> the
> > > log
> > > > directories have changed, we can probably optimize the tool to not
> > > trigger
> > > > partition reassignment through the controller and only
> > > > send ChangeReplicaDirRequest.
> > > >
> > >
> > > Yes, the reassignment script should not create the reassignment znode
> if
> > no
> > > replicas are not be moved between brokers. This falls into the "How to
> > move
> > > replica between log directories on the same broker" of the Proposed
> > Change
> > > section.
> > >
> > >
> > > > 11.2 If ChangeReplicaDirRequest specifies a replica that's not
> created
> > > yet,
> > > > could the broker just remember that in memory and create the replica
> > when
> > > > the creation is requested? This way, when doing cluster expansion, we
> > can
> > > > make sure that the new replicas on the new brokers are created in the
> > > right
> > > > log directory in the first place. We can also avoid the tool having
> to
> > > keep
> > > > issuing ChangeReplicaDirRequest in response to
> > > > ReplicaNotAvailableException.
> > > >
> > >
> > > I am concerned that the ChangeReplicaDirRequest would be lost if broker
> > > restarts after it sends ChangeReplicaDirResponse but before it receives
> > > LeaderAndIsrRequest. In this case, the user will receive success when
> > they
> > > initiate replica reassignment, but replica reassignment will never
> > complete
> > > when they verify the reassignment later. This would be confusing to
> user.
> > >
> > > There are three different approaches to this problem if broker has not
> > > created replica yet after it receives ChangeReplicaDirResquest:
> > >
> > > 1) Broker immediately replies to user with ReplicaNotAvailableException
> > and
> > > user can decide to retry again later. The advantage of this solution is
> > > that the broker logic is very simple and the reassignment script logic
> > also
> > > seems straightforward. The disadvantage is that user script has to
> retry.
> > > But it seems fine - we can set interval between retries to be 0.5 sec
> so
> > > that broker want be bombarded by those requests. This is the solution
> > > chosen in the current KIP.
> > >
> > > 2) Broker can put ChangeReplicaDirRequest in a purgatory with timeout
> and
> > > replies to user after the replica has been created. I didn't choose
> this
> > in
> > > the interest of keeping broker logic simpler.
> > >
> > > 3) Broker can remember that by making a mark in the disk, e.g. create
> > > topicPartition.tomove directory in the destination log directory. This
> > mark
> > > will be persisted across broker restart. This is the first idea I had
> > but I
> > > replaced it with solution 1) in the interest of keeping broker simple.
> > >
> > > It seems that solution 1) is the simplest one that works. But I am OK
> to
> > > switch to the other two solutions if we don't want the retry logic.
> What
> > do
> > > you think?
> > >
> > >
> > > 11.3 Do we need an option in the tool to specify intra.broker.
> > > > throttled.rate?
> > > >
> > >
> > > I don't find it useful to add this option to
> > kafka-reassign-partitions.sh.
> > > The reason we have the option "--throttle" in the script to throttle
> > > replication rate is that we usually want higher quota to fix an offline
> > > replica to get out of URP. But we are OK to have a lower quota if we
> are
> > > moving replica only to balance the cluster. Thus it is common for SRE
> to
> > > use different quota when using kafka-reassign-partitions.sh to move
> > replica
> > > between brokers.
> > >
> > > However, the only reason for moving replica between log directories of
> > the
> > > same broker is to balance cluster resource. Thus the option to
> > > specify intra.broker.throttled.rate in the tool is not that useful. I
> am
> > > inclined not to add this option to keep this tool's usage simpler.
> > >
> > >
> > > >
> > > > 12. DescribeDirsRequest
> > > > 12.1 In other requests like CreateTopicRequest, we return an empty
> list
> > > in
> > > > the response for an empty input list. If the input list is null, we
> > > return
> > > > everything. We should probably follow the same convention here.
> > > >
> > >
> > > Thanks. I wasn't aware of this convention. I have change
> > > DescribeDirsRequest so that "null" indicates "all".
> > >
> > >
> > > > 12.2 Do we need the topics field? Since the request is about log
> dirs,
> > it
> > > > makes sense to specify the log dirs. But it's weird to specify
> topics.
> > > >
> > >
> > > The topics field is not necessary. But it is useful to reduce the
> > response
> > > size in case user are only interested in the status of a few topics.
> For
> > > example, user may have initiated the reassignment of a given replica
> from
> > > one log directory to another log directory on the same broker, and the
> > user
> > > only wants to check the status of this given partition by looking
> > > at DescribeDirsResponse. Thus this field is useful.
> > >
> > > I am not sure if it is weird to call this request DescribeDirsRequest.
> > The
> > > response is a map from log directory to information to some partitions
> on
> > > the log directory. Do you think we need to change the name of the
> > request?
> > >
> > >
> > > > 12.3 DescribeDirsResponsePartition: Should we include firstOffset and
> > > > nextOffset in the response? That could be useful to track the
> progress
> > of
> > > > the movement.
> > > >
> > >
> > > Yeah good point. I agree it is useful to include logEndOffset in the
> > > response. According to Log.scala doc the logEndOffset is equivalent to
> > the
> > > nextOffset. User can track progress by checking the difference between
> > > logEndOffset of the given partition in the source and destination log
> > > directories. I have added logEndOffset to the
> > DescribeDirsResponsePartition
> > > in the KIP.
> > >
> > > But it seems that we don't need firstOffset in the response. Do you
> think
> > > firstOffset is still needed?
> > >
> > >
> > > >
> > > > 13. ChangeReplicaDirResponse: Do we need error code at both levels?
> > > >
> > >
> > > My bad. It is not needed. I have removed request level error code. I
> also
> > > added ChangeReplicaDirRequestTopic and ChangeReplicaDirResponseTopic to
> > > reduce duplication of the "topic" string in the request and response.
> > >
> > >
> > > >
> > > > 14. num.replica.move.threads: Does it default to # log dirs?
> > > >
> > >
> > > No. It doesn't. I expect default number to be set to a conservative
> value
> > > such as 3. It may be surprising to user if the number of threads
> increase
> > > just because they have assigned more log directories to Kafka broker.
> > >
> > > It seems that the number of replica move threads doesn't have to depend
> > on
> > > the number of log directories. It is possible to have one thread that
> > moves
> > > replicas across all log directories. On the other hand we can have
> > multiple
> > > threads to move replicas to the same log directory. For example, if
> > broker
> > > uses SSD, the CPU instead of disk IO may be the replica move bottleneck
> > and
> > > it will be faster to move replicas using multiple threads per log
> > > directory.
> > >
> > >
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Mar 9, 2017 at 7:04 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > I just made one correction in the KIP. If broker receives
> > > > > ChangeReplicaDirRequest and the replica hasn't been created there,
> > the
> > > > > broker will respond ReplicaNotAvailableException.
> > > > > The kafka-reassignemnt-partitions.sh will need to re-send
> > > > > ChangeReplicaDirRequest in this case in order to wait for
> controller
> > to
> > > > > send LeaderAndIsrRequest to broker. The previous approach of
> creating
> > > an
> > > > > empty directory seems hacky.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Jun,
> > > > > >
> > > > > > Thanks for your comments! I have updated the KIP to address your
> > > > > comments.
> > > > > > Please see my reply inline.
> > > > > >
> > > > > > Can you let me know if the latest KIP has addressed your
> comments?
> > > > > >
> > > > > > On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao <j...@confluent.io>
> wrote:
> > > > > >
> > > > > >> Hi, Dong,
> > > > > >>
> > > > > >> Thanks for the reply.
> > > > > >>
> > > > > >> 1.3 So the thread gets the lock, checks if caught up and
> releases
> > > the
> > > > > lock
> > > > > >> if not? Then, in the case when there is continuous incoming
> data,
> > > the
> > > > > >> thread may never get a chance to swap. One way to address this
> is
> > > when
> > > > > the
> > > > > >> thread is getting really close in catching up, just hold onto
> the
> > > lock
> > > > > >> until the thread fully catches up.
> > > > > >>
> > > > > >
> > > > > > Yes, that was my original solution. I see your point that the
> lock
> > > may
> > > > > not
> > > > > > be fairly assigned to ReplicaMoveThread and RequestHandlerThread
> > when
> > > > > there
> > > > > > is frequent incoming requets. You solution should address the
> > problem
> > > > > and I
> > > > > > have updated the KIP to use it.
> > > > > >
> > > > > >
> > > > > >>
> > > > > >> 2.3 So, you are saying that the partition reassignment tool can
> > > first
> > > > > send
> > > > > >> a ChangeReplicaDirRequest to relevant brokers to establish the
> log
> > > dir
> > > > > for
> > > > > >> replicas not created yet, then trigger the partition movement
> > across
> > > > > >> brokers through the controller? That's actually a good idea.
> Then,
> > > we
> > > > > can
> > > > > >> just leave LeaderAndIsrRequest as it is.
> > > > > >
> > > > > >
> > > > > > Yes, that is what I plan to do. If broker receives a
> > > > > > ChangeReplicaDirRequest while it is not leader or follower of the
> > > > > > partition, the broker will create an empty Log instance (i.e. a
> > > > directory
> > > > > > named topicPartition) in the destination log directory so that
> the
> > > > > replica
> > > > > > will be placed there when broker receives LeaderAndIsrRequest
> from
> > > the
> > > > > > broker. The broker should clean up empty those Log instances on
> > > startup
> > > > > > just in case a ChangeReplicaDirRequest was mistakenly sent to a
> > > broker
> > > > > that
> > > > > > was not meant to be follower/leader of the partition..
> > > > > >
> > > > > >
> > > > > >> Another thing related to
> > > > > >> ChangeReplicaDirRequest.
> > > > > >> Since this request may take long to complete, I am not sure if
> we
> > > > should
> > > > > >> wait for the movement to complete before respond. While waiting
> > for
> > > > the
> > > > > >> movement to complete, the idle connection may be killed or the
> > > client
> > > > > may
> > > > > >> be gone already. An alternative is to return immediately and
> add a
> > > new
> > > > > >> request like CheckReplicaDirRequest to see if the movement has
> > > > > completed.
> > > > > >> The tool can take advantage of that to check the status.
> > > > > >>
> > > > > >
> > > > > > I agree with your concern and solution. We need request to query
> > the
> > > > > > partition -> log_directory mapping on the broker. I have updated
> > the
> > > > KIP
> > > > > to
> > > > > > remove need for ChangeReplicaDirRequestPurgatory.
> > > > > > Instead, kafka-reassignemnt-partitions.sh will send
> > > > DescribeDirsRequest
> > > > > > to brokers when user wants to verify the partition assignment.
> > Since
> > > we
> > > > > > need this DescribeDirsRequest anyway, we can also use this
> request
> > to
> > > > > > expose stats like the individual log size instead of using JMX.
> One
> > > > > > drawback of using JMX is that user has to manage the JMX port and
> > > > related
> > > > > > credentials if they haven't already done this, which is the case
> at
> > > > > > LinkedIn.
> > > > > >
> > > > > >
> > > > > >> Thanks,
> > > > > >>
> > > > > >> Jun
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >>
> > > > > >> > Hey Jun,
> > > > > >> >
> > > > > >> > Thanks for the detailed explanation. I will use the separate
> > > thread
> > > > > >> pool to
> > > > > >> > move replica between log directories. I will let you know when
> > the
> > > > KIP
> > > > > >> has
> > > > > >> > been updated to use a separate thread pool.
> > > > > >> >
> > > > > >> > Here is my response to your other questions:
> > > > > >> >
> > > > > >> > 1.3 My idea is that the ReplicaMoveThread that moves data
> should
> > > get
> > > > > the
> > > > > >> > lock before checking whether the replica in the destination
> log
> > > > > >> directory
> > > > > >> > has caught up. If the new replica has caught up, then the
> > > > > >> ReplicaMoveThread
> > > > > >> > should swaps the replica while it is still holding the lock.
> The
> > > > > >> > ReplicaFetcherThread or RequestHandlerThread will not be able
> to
> > > > > append
> > > > > >> > data to the replica in the source replica during this period
> > > because
> > > > > >> they
> > > > > >> > can not get the lock. Does this address the problem?
> > > > > >> >
> > > > > >> > 2.3 I get your point that we want to keep controller simpler.
> If
> > > > admin
> > > > > >> tool
> > > > > >> > can send ChangeReplicaDirRequest to move data within a broker,
> > > then
> > > > > >> > controller probably doesn't even need to include log directory
> > > path
> > > > in
> > > > > >> the
> > > > > >> > LeaderAndIsrRequest. How about this: controller will only deal
> > > with
> > > > > >> > reassignment across brokers as it does now. If user specified
> > > > > >> destination
> > > > > >> > replica for any disk, the admin tool will send
> > > > ChangeReplicaDirRequest
> > > > > >> and
> > > > > >> > wait for response from broker to confirm that all replicas
> have
> > > been
> > > > > >> moved
> > > > > >> > to the destination log direcotry. The broker will put
> > > > > >> > ChangeReplicaDirRequset in a purgatory and respond either when
> > the
> > > > > >> movement
> > > > > >> > is completed or when the request has timed-out.
> > > > > >> >
> > > > > >> > 4. I agree that we can expose these metrics via JMX. But I am
> > not
> > > > sure
> > > > > >> if
> > > > > >> > it can be obtained easily with good performance using either
> > > > existing
> > > > > >> tools
> > > > > >> > or new script in kafka. I will ask SREs for their opinion.
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > Dong
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao <j...@confluent.io>
> > wrote:
> > > > > >> >
> > > > > >> > > Hi, Dong,
> > > > > >> > >
> > > > > >> > > Thanks for the updated KIP. A few more comments below.
> > > > > >> > >
> > > > > >> > > 1.1 and 1.2: I am still not sure there is enough benefit of
> > > > reusing
> > > > > >> > > ReplicaFetchThread
> > > > > >> > > to move data across disks.
> > > > > >> > > (a) A big part of ReplicaFetchThread is to deal with issuing
> > and
> > > > > >> tracking
> > > > > >> > > fetch requests. So, it doesn't feel that we get much from
> > > reusing
> > > > > >> > > ReplicaFetchThread
> > > > > >> > > only to disable the fetching part.
> > > > > >> > > (b) The leader replica has no ReplicaFetchThread to start
> > with.
> > > It
> > > > > >> feels
> > > > > >> > > weird to start one just for intra broker data movement.
> > > > > >> > > (c) The ReplicaFetchThread is per broker. Intuitively, the
> > > number
> > > > of
> > > > > >> > > threads doing intra broker data movement should be related
> to
> > > the
> > > > > >> number
> > > > > >> > of
> > > > > >> > > disks in the broker, not the number of brokers in the
> cluster.
> > > > > >> > > (d) If the destination disk fails, we want to stop the intra
> > > > broker
> > > > > >> data
> > > > > >> > > movement, but want to continue inter broker replication. So,
> > > > > >> logically,
> > > > > >> > it
> > > > > >> > > seems it's better to separate out the two.
> > > > > >> > > (e) I am also not sure if we should reuse the existing
> > > throttling
> > > > > for
> > > > > >> > > replication. It's designed to handle traffic across brokers
> > and
> > > > the
> > > > > >> > > delaying is done in the fetch request. So, if we are not
> doing
> > > > > >> > > fetching in ReplicaFetchThread,
> > > > > >> > > I am not sure the existing throttling is effective. Also,
> when
> > > > > >> specifying
> > > > > >> > > the throttling of moving data across disks, it seems the
> user
> > > > > >> shouldn't
> > > > > >> > > care about whether a replica is a leader or a follower.
> > Reusing
> > > > the
> > > > > >> > > existing throttling config name will be awkward in this
> > regard.
> > > > > >> > > (f) It seems it's simpler and more consistent to use a
> > separate
> > > > > thread
> > > > > >> > pool
> > > > > >> > > for local data movement (for both leader and follower
> > replicas).
> > > > > This
> > > > > >> > > process can then be configured (e.g. number of threads, etc)
> > and
> > > > > >> > throttled
> > > > > >> > > independently.
> > > > > >> > >
> > > > > >> > > 1.3 Yes, we will need some synchronization there. So, if the
> > > > > movement
> > > > > >> > > thread catches up, gets the lock to do the swap, but
> realizes
> > > that
> > > > > new
> > > > > >> > data
> > > > > >> > > is added, it has to continue catching up while holding the
> > lock?
> > > > > >> > >
> > > > > >> > > 2.3 The benefit of including the desired log directory in
> > > > > >> > > LeaderAndIsrRequest
> > > > > >> > > during partition reassignment is that the controller doesn't
> > > need
> > > > to
> > > > > >> > track
> > > > > >> > > the progress for disk movement. So, you don't need the
> > > additional
> > > > > >> > > BrokerDirStateUpdateRequest. Then the controller never needs
> > to
> > > > > issue
> > > > > >> > > ChangeReplicaDirRequest.
> > > > > >> > > Only the admin tool will issue ChangeReplicaDirRequest to
> move
> > > > data
> > > > > >> > within
> > > > > >> > > a broker. I agree that this makes LeaderAndIsrRequest more
> > > > > >> complicated,
> > > > > >> > but
> > > > > >> > > that seems simpler than changing the controller to track
> > > > additional
> > > > > >> > states
> > > > > >> > > during partition reassignment.
> > > > > >> > >
> > > > > >> > > 4. We want to make a decision on how to expose the stats. So
> > > far,
> > > > we
> > > > > >> are
> > > > > >> > > exposing stats like the individual log size as JMX. So, one
> > way
> > > is
> > > > > to
> > > > > >> > just
> > > > > >> > > add new jmx to expose the log directory of individual
> > replicas.
> > > > > >> > >
> > > > > >> > > Thanks,
> > > > > >> > >
> > > > > >> > > Jun
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Thu, Mar 2, 2017 at 11:18 PM, Dong Lin <
> > lindon...@gmail.com>
> > > > > >> wrote:
> > > > > >> > >
> > > > > >> > > > Hey Jun,
> > > > > >> > > >
> > > > > >> > > > Thanks for all the comments! Please see my answer below. I
> > > have
> > > > > >> updated
> > > > > >> > > the
> > > > > >> > > > KIP to address most of the questions and make the KIP
> easier
> > > to
> > > > > >> > > understand.
> > > > > >> > > >
> > > > > >> > > > Thanks,
> > > > > >> > > > Dong
> > > > > >> > > >
> > > > > >> > > > On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao <j...@confluent.io
> >
> > > > wrote:
> > > > > >> > > >
> > > > > >> > > > > Hi, Dong,
> > > > > >> > > > >
> > > > > >> > > > > Thanks for the KIP. A few comments below.
> > > > > >> > > > >
> > > > > >> > > > > 1. For moving data across directories
> > > > > >> > > > > 1.1 I am not sure why we want to use
> ReplicaFetcherThread
> > to
> > > > > move
> > > > > >> > data
> > > > > >> > > > > around in the leader. ReplicaFetchThread fetches data
> from
> > > > > socket.
> > > > > >> > For
> > > > > >> > > > > moving data locally, it seems that we want to avoid the
> > > socket
> > > > > >> > > overhead.
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > > > The purpose of using ReplicaFetchThread is to re-use
> > existing
> > > > > thread
> > > > > >> > > > instead of creating more threads and make our thread model
> > > more
> > > > > >> > complex.
> > > > > >> > > It
> > > > > >> > > > seems like a nature choice for copying data between disks
> > > since
> > > > it
> > > > > >> is
> > > > > >> > > > similar to copying data between brokers. Another reason is
> > > that
> > > > if
> > > > > >> the
> > > > > >> > > > replica to be moved is a follower, we don't need lock to
> > swap
> > > > > >> replicas
> > > > > >> > > when
> > > > > >> > > > destination replica has caught up, since the same thread
> > which
> > > > is
> > > > > >> > > fetching
> > > > > >> > > > data from leader will swap the replica.
> > > > > >> > > >
> > > > > >> > > > The ReplicaFetchThread will not incur socket overhead
> while
> > > > > copying
> > > > > >> > data
> > > > > >> > > > between disks. It will read directly from source disk (as
> we
> > > do
> > > > > when
> > > > > >> > > > processing FetchRequest) and write to destination disk (as
> > we
> > > do
> > > > > >> when
> > > > > >> > > > processing ProduceRequest).
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > > 1.2 I am also not sure about moving data in the
> > > > > >> ReplicaFetcherThread
> > > > > >> > in
> > > > > >> > > > the
> > > > > >> > > > > follower. For example, I am not sure setting
> > > > > >> replica.fetch.max.wait
> > > > > >> > to
> > > > > >> > > 0
> > > > > >> > > > >  is ideal. It may not always be effective since a fetch
> > > > request
> > > > > in
> > > > > >> > the
> > > > > >> > > > > ReplicaFetcherThread could be arbitrarily delayed due to
> > > > > >> replication
> > > > > >> > > > > throttling on the leader. In general, the data movement
> > > logic
> > > > > >> across
> > > > > >> > > > disks
> > > > > >> > > > > seems different from that in ReplicaFetcherThread. So, I
> > am
> > > > not
> > > > > >> sure
> > > > > >> > > why
> > > > > >> > > > > they need to be coupled.
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > > > While it may not be the most efficient way to copy data
> > > between
> > > > > >> local
> > > > > >> > > > disks, it will be at least as efficient as copying data
> from
> > > > > leader
> > > > > >> to
> > > > > >> > > the
> > > > > >> > > > destination disk. The expected goal of KIP-113 is to
> enable
> > > data
> > > > > >> > movement
> > > > > >> > > > between disks with no less efficiency than what we do now
> > when
> > > > > >> moving
> > > > > >> > > data
> > > > > >> > > > between brokers. I think we can optimize its performance
> > using
> > > > > >> separate
> > > > > >> > > > thread if the performance is not good enough.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > > 1.3 Could you add a bit more details on how we swap the
> > > > replicas
> > > > > >> when
> > > > > >> > > the
> > > > > >> > > > > new ones are fully caught up? For example, what happens
> > when
> > > > the
> > > > > >> new
> > > > > >> > > > > replica in the new log directory is caught up, but when
> we
> > > > want
> > > > > >> to do
> > > > > >> > > the
> > > > > >> > > > > swap, some new data has arrived?
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > > > If the replica is a leader, then ReplicaFetcherThread will
> > > > perform
> > > > > >> the
> > > > > >> > > > replacement. Proper lock is needed to prevent
> > > > KafkaRequestHandler
> > > > > >> from
> > > > > >> > > > appending data to the topicPartition.log on the source
> disks
> > > > > before
> > > > > >> > this
> > > > > >> > > > replacement is completed by ReplicaFetcherThread.
> > > > > >> > > >
> > > > > >> > > > If the replica is a follower, because the same
> > > > ReplicaFetchThread
> > > > > >> which
> > > > > >> > > > fetches data from leader will also swap the replica , no
> > lock
> > > is
> > > > > >> > needed.
> > > > > >> > > >
> > > > > >> > > > I have updated the KIP to specify both more explicitly.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > > 1.4 Do we need to do the .move at the log segment level
> or
> > > > could
> > > > > >> we
> > > > > >> > > just
> > > > > >> > > > do
> > > > > >> > > > > that at the replica directory level? Renaming just a
> > > directory
> > > > > is
> > > > > >> > much
> > > > > >> > > > > faster than renaming the log segments.
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > > > Great point. I have updated the KIP to rename the log
> > > directory
> > > > > >> > instead.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > > 1.5 Could you also describe a bit what happens when
> either
> > > the
> > > > > >> source
> > > > > >> > > or
> > > > > >> > > > > the target log directory fails while the data moving is
> in
> > > > > >> progress?
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > > > If source log directory fails, then the replica movement
> > will
> > > > stop
> > > > > >> and
> > > > > >> > > the
> > > > > >> > > > source replica is marked offline. If destination log
> > directory
> > > > > >> fails,
> > > > > >> > > then
> > > > > >> > > > the replica movement will stop. I have updated the KIP to
> > > > clarify
> > > > > >> this.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > >
> > > > > >> > > > > 2. For partition reassignment.
> > > > > >> > > > > 2.1 I am not sure if the controller can block on
> > > > > >> > > ChangeReplicaDirRequest.
> > > > > >> > > > > Data movement may take a long time to complete. If there
> > is
> > > an
> > > > > >> > > > outstanding
> > > > > >> > > > > request from the controller to a broker, that broker
> won't
> > > be
> > > > > >> able to
> > > > > >> > > > > process any new request from the controller. So if
> another
> > > > event
> > > > > >> > (e.g.
> > > > > >> > > > > broker failure) happens when the data movement is in
> > > progress,
> > > > > >> > > subsequent
> > > > > >> > > > > LeaderAnIsrRequest will be delayed.
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > > > Yeah good point. I missed the fact that there is be only
> one
> > > > > >> inflight
> > > > > >> > > > request from controller to broker.
> > > > > >> > > >
> > > > > >> > > > How about I add a request, e.g.
> BrokerDirStateUpdateRequest,
> > > > which
> > > > > >> maps
> > > > > >> > > > topicPartition to log directory and can be sent from
> broker
> > to
> > > > > >> > controller
> > > > > >> > > > to indicate completion?
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > > 2.2 in the KIP, the partition reassignment tool is also
> > used
> > > > for
> > > > > >> > cases
> > > > > >> > > > > where an admin just wants to balance the existing data
> > > across
> > > > > log
> > > > > >> > > > > directories in the broker. In this case, it seems that
> > it's
> > > > over
> > > > > >> > > killing
> > > > > >> > > > to
> > > > > >> > > > > have the process go through the controller. A simpler
> > > approach
> > > > > is
> > > > > >> to
> > > > > >> > > > issue
> > > > > >> > > > > an RPC request to the broker directly.
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > > > I agree we can optimize this case. It is just that we have
> > to
> > > > add
> > > > > >> new
> > > > > >> > > logic
> > > > > >> > > > or code path to handle a scenario that is already covered
> by
> > > the
> > > > > >> more
> > > > > >> > > > complicated scenario. I will add it to the KIP.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > > 2.3 When using the partition reassignment tool to move
> > > > replicas
> > > > > >> > across
> > > > > >> > > > > brokers, it make sense to be able to specify the log
> > > directory
> > > > > of
> > > > > >> the
> > > > > >> > > > newly
> > > > > >> > > > > created replicas. The KIP does that in two separate
> > requests
> > > > > >> > > > > ChangeReplicaDirRequest and LeaderAndIsrRequest, and
> > tracks
> > > > the
> > > > > >> > > progress
> > > > > >> > > > of
> > > > > >> > > > > each independently. An alternative is to do that just in
> > > > > >> > > > > LeaderAndIsrRequest.
> > > > > >> > > > > That way, the new replicas will be created in the right
> > log
> > > > dir
> > > > > in
> > > > > >> > the
> > > > > >> > > > > first place and the controller just needs to track the
> > > > progress
> > > > > of
> > > > > >> > > > > partition reassignment in the current way.
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > > > I agree it is better to use one request instead of two to
> > > > request
> > > > > >> > replica
> > > > > >> > > > movement between disks. But I think the performance
> > advantage
> > > of
> > > > > >> doing
> > > > > >> > so
> > > > > >> > > > is negligible because we trigger replica assignment much
> > less
> > > > than
> > > > > >> all
> > > > > >> > > > other kinds of events in the Kafka cluster. I am not sure
> > that
> > > > the
> > > > > >> > > benefit
> > > > > >> > > > of doing this is worth the effort to add an optional
> string
> > > > field
> > > > > in
> > > > > >> > the
> > > > > >> > > > LeaderAndIsrRequest. Also if we add this optional field in
> > the
> > > > > >> > > > LeaderAndIsrRequest, we probably want to remove
> > > > > >> ChangeReplicaDirRequest
> > > > > >> > > to
> > > > > >> > > > avoid having two requests doing the same thing. But it
> means
> > > > user
> > > > > >> > script
> > > > > >> > > > can not send request directly to the broker to trigger
> > replica
> > > > > >> movement
> > > > > >> > > > between log directories.
> > > > > >> > > >
> > > > > >> > > > I will do it if you are strong about this optimzation.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > >
> > > > > >> > > > > 3. /admin/reassign_partitions: Including the log dir in
> > > every
> > > > > >> replica
> > > > > >> > > may
> > > > > >> > > > > not be efficient. We could include a list of log
> > directories
> > > > and
> > > > > >> > > > reference
> > > > > >> > > > > the index of the log directory in each replica.
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > > > Good point. I have updated the KIP to use this solution.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > >
> > > > > >> > > > > 4. DescribeDirsRequest: The stats in the request are
> > already
> > > > > >> > available
> > > > > >> > > > from
> > > > > >> > > > > JMX. Do we need the new request?
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > > > Does JMX also include the state (i.e. offline or online)
> of
> > > each
> > > > > log
> > > > > >> > > > directory and the log directory of each replica? If not,
> > then
> > > > > maybe
> > > > > >> we
> > > > > >> > > > still need DescribeDirsRequest?
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > >
> > > > > >> > > > > 5. We want to be consistent on ChangeReplicaDirRequest
> vs
> > > > > >> > > > > ChangeReplicaRequest.
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > > > I think ChangeReplicaRequest and ChangeReplicaResponse is
> my
> > > > typo.
> > > > > >> > Sorry,
> > > > > >> > > > they are fixed now.
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > >
> > > > > >> > > > > Thanks,
> > > > > >> > > > >
> > > > > >> > > > > Jun
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > On Fri, Feb 3, 2017 at 6:19 PM, Dong Lin <
> > > lindon...@gmail.com
> > > > >
> > > > > >> > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > Hey ALexey,
> > > > > >> > > > > >
> > > > > >> > > > > > Thanks for all the comments!
> > > > > >> > > > > >
> > > > > >> > > > > > I have updated the KIP to specify how we enforce
> quota.
> > I
> > > > also
> > > > > >> > > updated
> > > > > >> > > > > the
> > > > > >> > > > > > "The thread model and broker logic for moving replica
> > data
> > > > > >> between
> > > > > >> > > log
> > > > > >> > > > > > directories" to make it easier to read. You can find
> the
> > > > exact
> > > > > >> > change
> > > > > >> > > > > here
> > > > > >> > > > > > <https://cwiki.apache.org/conf
> > > > luence/pages/diffpagesbyversio
> > > > > >> > > > > > n.action?pageId=67638408&selec
> > > > tedPageVersions=5&selectedPage
> > > > > >> > > > Versions=6>.
> > > > > >> > > > > > The idea is to use the same replication quota
> mechanism
> > > > > >> introduced
> > > > > >> > in
> > > > > >> > > > > > KIP-73.
> > > > > >> > > > > >
> > > > > >> > > > > > Thanks,
> > > > > >> > > > > > Dong
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > > On Wed, Feb 1, 2017 at 2:16 AM, Alexey Ozeritsky <
> > > > > >> > > aozerit...@yandex.ru
> > > > > >> > > > >
> > > > > >> > > > > > wrote:
> > > > > >> > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > > > 24.01.2017, 22:03, "Dong Lin" <lindon...@gmail.com
> >:
> > > > > >> > > > > > > > Hey Alexey,
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > Thanks. I think we agreed that the suggested
> > solution
> > > > > >> doesn't
> > > > > >> > > work
> > > > > >> > > > in
> > > > > >> > > > > > > > general for kafka users. To answer your questions:
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > 1. I agree we need quota to rate limit replica
> > > movement
> > > > > >> when a
> > > > > >> > > > broker
> > > > > >> > > > > > is
> > > > > >> > > > > > > > moving a "leader" replica. I will come up with
> > > solution,
> > > > > >> > probably
> > > > > >> > > > > > re-use
> > > > > >> > > > > > > > the config of replication quota introduced in
> > KIP-73.
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > 2. Good point. I agree that this is a problem in
> > > > general.
> > > > > >> If is
> > > > > >> > > no
> > > > > >> > > > > new
> > > > > >> > > > > > > data
> > > > > >> > > > > > > > on that broker, with current default value of
> > > > > >> > > > > > replica.fetch.wait.max.ms
> > > > > >> > > > > > > > and replica.fetch.max.bytes, the replica will be
> > moved
> > > > at
> > > > > >> only
> > > > > >> > 2
> > > > > >> > > > MBps
> > > > > >> > > > > > > > throughput. I think the solution is for broker to
> > set
> > > > > >> > > > > > > > replica.fetch.wait.max.ms to 0 in its
> FetchRequest
> > if
> > > > the
> > > > > >> > > > > > corresponding
> > > > > >> > > > > > > > ReplicaFetcherThread needs to move some replica to
> > > > another
> > > > > >> > disk.
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > 3. I have updated the KIP to mention that the read
> > > size
> > > > > of a
> > > > > >> > > given
> > > > > >> > > > > > > > partition is configured using
> > replica.fetch.max.bytes
> > > > when
> > > > > >> we
> > > > > >> > > move
> > > > > >> > > > > > > replicas
> > > > > >> > > > > > > > between disks.
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > Please see this
> > > > > >> > > > > > > > <https://cwiki.apache.org/conf
> > > > > >> luence/pages/diffpagesbyversio
> > > > > >> > > > n.action
> > > > > >> > > > > ?
> > > > > >> > > > > > > pageId=67638408&selectedPageVe
> > > > > rsions=4&selectedPageVersions=
> > > > > >> 5>
> > > > > >> > > > > > > > for the change of the KIP. I will come up with a
> > > > solution
> > > > > to
> > > > > >> > > > throttle
> > > > > >> > > > > > > > replica movement when a broker is moving a
> "leader"
> > > > > replica.
> > > > > >> > > > > > >
> > > > > >> > > > > > > Thanks. It looks great.
> > > > > >> > > > > > >
> > > > > >> > > > > > > >
> > > > > >> > > > > > > > On Tue, Jan 24, 2017 at 3:30 AM, Alexey Ozeritsky
> <
> > > > > >> > > > > > aozerit...@yandex.ru>
> > > > > >> > > > > > > > wrote:
> > > > > >> > > > > > > >
> > > > > >> > > > > > > >>  23.01.2017, 22:11, "Dong Lin" <
> > lindon...@gmail.com
> > > >:
> > > > > >> > > > > > > >>  > Thanks. Please see my comment inline.
> > > > > >> > > > > > > >>  >
> > > > > >> > > > > > > >>  > On Mon, Jan 23, 2017 at 6:45 AM, Alexey
> > Ozeritsky
> > > <
> > > > > >> > > > > > > aozerit...@yandex.ru>
> > > > > >> > > > > > > >>  > wrote:
> > > > > >> > > > > > > >>  >
> > > > > >> > > > > > > >>  >> 13.01.2017, 22:29, "Dong Lin" <
> > > lindon...@gmail.com
> > > > >:
> > > > > >> > > > > > > >>  >> > Hey Alexey,
> > > > > >> > > > > > > >>  >> >
> > > > > >> > > > > > > >>  >> > Thanks for your review and the alternative
> > > > > approach.
> > > > > >> > Here
> > > > > >> > > is
> > > > > >> > > > > my
> > > > > >> > > > > > > >>  >> > understanding of your patch. kafka's
> > background
> > > > > >> threads
> > > > > >> > > are
> > > > > >> > > > > used
> > > > > >> > > > > > > to
> > > > > >> > > > > > > >>  move
> > > > > >> > > > > > > >>  >> > data between replicas. When data movement
> is
> > > > > >> triggered,
> > > > > >> > > the
> > > > > >> > > > > log
> > > > > >> > > > > > > will
> > > > > >> > > > > > > >>  be
> > > > > >> > > > > > > >>  >> > rolled and the new logs will be put in the
> > new
> > > > > >> > directory,
> > > > > >> > > > and
> > > > > >> > > > > > > >>  background
> > > > > >> > > > > > > >>  >> > threads will move segment from old
> directory
> > to
> > > > new
> > > > > >> > > > directory.
> > > > > >> > > > > > > >>  >> >
> > > > > >> > > > > > > >>  >> > It is important to note that KIP-112 is
> > > intended
> > > > to
> > > > > >> work
> > > > > >> > > > with
> > > > > >> > > > > > > >>  KIP-113 to
> > > > > >> > > > > > > >>  >> > support JBOD. I think your solution is
> > > definitely
> > > > > >> > simpler
> > > > > >> > > > and
> > > > > >> > > > > > > better
> > > > > >> > > > > > > >>  >> under
> > > > > >> > > > > > > >>  >> > the current kafka implementation that a
> > broker
> > > > will
> > > > > >> fail
> > > > > >> > > if
> > > > > >> > > > > any
> > > > > >> > > > > > > disk
> > > > > >> > > > > > > >>  >> fails.
> > > > > >> > > > > > > >>  >> > But I am not sure if we want to allow
> broker
> > to
> > > > run
> > > > > >> with
> > > > > >> > > > > partial
> > > > > >> > > > > > > >>  disks
> > > > > >> > > > > > > >>  >> > failure. Let's say the a replica is being
> > moved
> > > > > from
> > > > > >> > > > > log_dir_old
> > > > > >> > > > > > > to
> > > > > >> > > > > > > >>  >> > log_dir_new and then log_dir_old stops
> > working
> > > > due
> > > > > to
> > > > > >> > disk
> > > > > >> > > > > > > failure.
> > > > > >> > > > > > > >>  How
> > > > > >> > > > > > > >>  >> > would your existing patch handles it? To
> make
> > > the
> > > > > >> > > scenario a
> > > > > >> > > > > bit
> > > > > >> > > > > > > more
> > > > > >> > > > > > > >>  >>
> > > > > >> > > > > > > >>  >> We will lose log_dir_old. After broker
> restart
> > we
> > > > can
> > > > > >> read
> > > > > >> > > the
> > > > > >> > > > > > data
> > > > > >> > > > > > > >>  from
> > > > > >> > > > > > > >>  >> log_dir_new.
> > > > > >> > > > > > > >>  >
> > > > > >> > > > > > > >>  > No, you probably can't. This is because the
> > broker
> > > > > >> doesn't
> > > > > >> > > have
> > > > > >> > > > > > > *all* the
> > > > > >> > > > > > > >>  > data for this partition. For example, say the
> > > broker
> > > > > has
> > > > > >> > > > > > > >>  > partition_segement_1, partition_segment_50 and
> > > > > >> > > > > > partition_segment_100
> > > > > >> > > > > > > on
> > > > > >> > > > > > > >>  the
> > > > > >> > > > > > > >>  > log_dir_old. partition_segment_100, which has
> > the
> > > > > latest
> > > > > >> > > data,
> > > > > >> > > > > has
> > > > > >> > > > > > > been
> > > > > >> > > > > > > >>  > moved to log_dir_new, and the log_dir_old
> fails
> > > > before
> > > > > >> > > > > > > >>  partition_segment_50
> > > > > >> > > > > > > >>  > and partition_segment_1 is moved to
> log_dir_new.
> > > > When
> > > > > >> > broker
> > > > > >> > > > > > > re-starts,
> > > > > >> > > > > > > >>  it
> > > > > >> > > > > > > >>  > won't have partition_segment_50. This causes
> > > problem
> > > > > if
> > > > > >> > > broker
> > > > > >> > > > is
> > > > > >> > > > > > > elected
> > > > > >> > > > > > > >>  > leader and consumer wants to consume data in
> the
> > > > > >> > > > > > partition_segment_1.
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >>  Right.
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >>  >
> > > > > >> > > > > > > >>  >> > complicated, let's say the broker is
> > shtudown,
> > > > > >> > > log_dir_old's
> > > > > >> > > > > > disk
> > > > > >> > > > > > > >>  fails,
> > > > > >> > > > > > > >>  >> > and the broker starts. In this case broker
> > > > doesn't
> > > > > >> even
> > > > > >> > > know
> > > > > >> > > > > if
> > > > > >> > > > > > > >>  >> log_dir_new
> > > > > >> > > > > > > >>  >> > has all the data needed for this replica.
> It
> > > > > becomes
> > > > > >> a
> > > > > >> > > > problem
> > > > > >> > > > > > if
> > > > > >> > > > > > > the
> > > > > >> > > > > > > >>  >> > broker is elected leader of this partition
> in
> > > > this
> > > > > >> case.
> > > > > >> > > > > > > >>  >>
> > > > > >> > > > > > > >>  >> log_dir_new contains the most recent data so
> we
> > > > will
> > > > > >> lose
> > > > > >> > > the
> > > > > >> > > > > tail
> > > > > >> > > > > > > of
> > > > > >> > > > > > > >>  >> partition.
> > > > > >> > > > > > > >>  >> This is not a big problem for us because we
> > > already
> > > > > >> delete
> > > > > >> > > > tails
> > > > > >> > > > > > by
> > > > > >> > > > > > > >>  hand
> > > > > >> > > > > > > >>  >> (see https://issues.apache.org/jira
> > > > > /browse/KAFKA-1712
> > > > > >> ).
> > > > > >> > > > > > > >>  >> Also we dont use authomatic leader balancing
> > > > > >> > > > > > > >>  (auto.leader.rebalance.enable=false),
> > > > > >> > > > > > > >>  >> so this partition becomes the leader with a
> low
> > > > > >> > probability.
> > > > > >> > > > > > > >>  >> I think my patch can be modified to prohibit
> > the
> > > > > >> selection
> > > > > >> > > of
> > > > > >> > > > > the
> > > > > >> > > > > > > >>  leader
> > > > > >> > > > > > > >>  >> until the partition does not move completely.
> > > > > >> > > > > > > >>  >
> > > > > >> > > > > > > >>  > I guess you are saying that you have deleted
> the
> > > > tails
> > > > > >> by
> > > > > >> > > hand
> > > > > >> > > > in
> > > > > >> > > > > > > your
> > > > > >> > > > > > > >>  own
> > > > > >> > > > > > > >>  > kafka branch. But KAFKA-1712 is not accepted
> > into
> > > > > Kafka
> > > > > >> > trunk
> > > > > >> > > > > and I
> > > > > >> > > > > > > am
> > > > > >> > > > > > > >>  not
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >>  No. We just modify segments mtime by cron job.
> > This
> > > > > works
> > > > > >> > with
> > > > > >> > > > > > vanilla
> > > > > >> > > > > > > >>  kafka.
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >>  > sure if it is the right solution. How would
> this
> > > > > >> solution
> > > > > >> > > > address
> > > > > >> > > > > > the
> > > > > >> > > > > > > >>  > problem mentioned above?
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >>  If you need only fresh data and if you remove
> old
> > > data
> > > > > by
> > > > > >> > hands
> > > > > >> > > > > this
> > > > > >> > > > > > is
> > > > > >> > > > > > > >>  not a problem. But in general case
> > > > > >> > > > > > > >>  this is a problem of course.
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >>  >
> > > > > >> > > > > > > >>  > BTW, I am not sure the solution mentioned in
> > > > > KAFKA-1712
> > > > > >> is
> > > > > >> > > the
> > > > > >> > > > > > right
> > > > > >> > > > > > > way
> > > > > >> > > > > > > >>  to
> > > > > >> > > > > > > >>  > address its problem. Now that we have
> timestamp
> > in
> > > > the
> > > > > >> > > message
> > > > > >> > > > we
> > > > > >> > > > > > > can use
> > > > > >> > > > > > > >>  > that to delete old segement instead of relying
> > on
> > > > the
> > > > > >> log
> > > > > >> > > > segment
> > > > > >> > > > > > > mtime.
> > > > > >> > > > > > > >>  > Just some idea and we don't have to discuss
> this
> > > > > problem
> > > > > >> > > here.
> > > > > >> > > > > > > >>  >
> > > > > >> > > > > > > >>  >> >
> > > > > >> > > > > > > >>  >> > The solution presented in the KIP attempts
> to
> > > > > handle
> > > > > >> it
> > > > > >> > by
> > > > > >> > > > > > > replacing
> > > > > >> > > > > > > >>  >> > replica in an atomic version fashion after
> > the
> > > > log
> > > > > in
> > > > > >> > the
> > > > > >> > > > new
> > > > > >> > > > > > dir
> > > > > >> > > > > > > has
> > > > > >> > > > > > > >>  >> fully
> > > > > >> > > > > > > >>  >> > caught up with the log in the old dir. At
> at
> > > time
> > > > > the
> > > > > >> > log
> > > > > >> > > > can
> > > > > >> > > > > be
> > > > > >> > > > > > > >>  >> considered
> > > > > >> > > > > > > >>  >> > to exist on only one log directory.
> > > > > >> > > > > > > >>  >>
> > > > > >> > > > > > > >>  >> As I understand your solution does not cover
> > > > quotas.
> > > > > >> > > > > > > >>  >> What happens if someone starts to transfer
> 100
> > > > > >> partitions
> > > > > >> > ?
> > > > > >> > > > > > > >>  >
> > > > > >> > > > > > > >>  > Good point. Quota can be implemented in the
> > > future.
> > > > It
> > > > > >> is
> > > > > >> > > > > currently
> > > > > >> > > > > > > >>  > mentioned as as a potential future improvement
> > in
> > > > > >> KIP-112
> > > > > >> > > > > > > >>  > <https://cwiki.apache.org/conf
> > > > > luence/display/KAFKA/KIP-
> > > > > >> > 112%3
> > > > > >> > > > > > > >>  A+Handle+disk+failure+for+JBOD>.Thanks
> > > > > >> > > > > > > >>  > for the reminder. I will move it to KIP-113.
> > > > > >> > > > > > > >>  >
> > > > > >> > > > > > > >>  >> > If yes, it will read a ByteBufferMessageSet
> > > from
> > > > > >> > > > > > > topicPartition.log
> > > > > >> > > > > > > >>  and
> > > > > >> > > > > > > >>  >> append the message set to topicPartition.move
> > > > > >> > > > > > > >>  >>
> > > > > >> > > > > > > >>  >> i.e. processPartitionData will read data from
> > the
> > > > > >> > beginning
> > > > > >> > > of
> > > > > >> > > > > > > >>  >> topicPartition.log? What is the read size?
> > > > > >> > > > > > > >>  >> A ReplicaFetchThread reads many partitions so
> > if
> > > > one
> > > > > >> does
> > > > > >> > > some
> > > > > >> > > > > > > >>  complicated
> > > > > >> > > > > > > >>  >> work (= read a lot of data from disk)
> > everything
> > > > will
> > > > > >> slow
> > > > > >> > > > down.
> > > > > >> > > > > > > >>  >> I think read size should not be very big.
> > > > > >> > > > > > > >>  >>
> > > > > >> > > > > > > >>  >> On the other hand at this point
> > > > > (processPartitionData)
> > > > > >> one
> > > > > >> > > can
> > > > > >> > > > > use
> > > > > >> > > > > > > only
> > > > > >> > > > > > > >>  >> the new data (ByteBufferMessageSet from
> > > parameters)
> > > > > and
> > > > > >> > wait
> > > > > >> > > > > until
> > > > > >> > > > > > > >>  >> (topicPartition.move.smallestOffset <=
> > > > > >> > > > > > > topicPartition.log.smallestOff
> > > > > >> > > > > > > >>  set
> > > > > >> > > > > > > >>  >> && topicPartition.log.largestOffset ==
> > > > > >> > > > > > > topicPartition.log.largestOffs
> > > > > >> > > > > > > >>  et).
> > > > > >> > > > > > > >>  >> In this case the write speed to
> > > topicPartition.move
> > > > > and
> > > > > >> > > > > > > >>  topicPartition.log
> > > > > >> > > > > > > >>  >> will be the same so this will allow us to
> move
> > > many
> > > > > >> > > partitions
> > > > > >> > > > > to
> > > > > >> > > > > > > one
> > > > > >> > > > > > > >>  disk.
> > > > > >> > > > > > > >>  >
> > > > > >> > > > > > > >>  > The read size of a given partition is
> configured
> > > > > >> > > > > > > >>  > using replica.fetch.max.bytes, which is the
> same
> > > > size
> > > > > >> used
> > > > > >> > by
> > > > > >> > > > > > > >>  FetchRequest
> > > > > >> > > > > > > >>  > from follower to leader. If the broker is
> > moving a
> > > > > >> replica
> > > > > >> > > for
> > > > > >> > > > > > which
> > > > > >> > > > > > > it
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >>  OK. Could you mention it in KIP?
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >>  > acts as a follower, the disk write rate for
> > moving
> > > > > this
> > > > > >> > > replica
> > > > > >> > > > > is
> > > > > >> > > > > > at
> > > > > >> > > > > > > >>  most
> > > > > >> > > > > > > >>  > the rate it fetches from leader (assume it is
> > > > catching
> > > > > >> up
> > > > > >> > and
> > > > > >> > > > has
> > > > > >> > > > > > > >>  > sufficient data to read from leader, which is
> > > > subject
> > > > > to
> > > > > >> > > > > > > round-trip-time
> > > > > >> > > > > > > >>  > between itself and the leader. Thus this part
> if
> > > > > >> probably
> > > > > >> > > fine
> > > > > >> > > > > even
> > > > > >> > > > > > > >>  without
> > > > > >> > > > > > > >>  > quota.
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >>  I think there are 2 problems
> > > > > >> > > > > > > >>  1. Without speed limiter this will not work good
> > > even
> > > > > for
> > > > > >> 1
> > > > > >> > > > > > partition.
> > > > > >> > > > > > > In
> > > > > >> > > > > > > >>  our production we had a problem so we did the
> > > throuput
> > > > > >> > limiter:
> > > > > >> > > > > > > >>  https://github.com/resetius/ka
> > > > > >> fka/commit/cda31dadb2f135743bf
> > > > > >> > > > > > > >>  41083062927886c5ddce1#diff-ffa
> > > > > >> 8861e850121997a534ebdde2929c6R
> > > > > >> > > 713
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >>  2. I dont understand how it will work in case of
> > big
> > > > > >> > > > > > > >>  replica.fetch.wait.max.ms and partition with
> > > > irregular
> > > > > >> flow.
> > > > > >> > > > > > > >>  For example someone could have
> > > > > replica.fetch.wait.max.ms
> > > > > >> > =10mi
> > > > > >> > > > nutes
> > > > > >> > > > > > and
> > > > > >> > > > > > > >>  partition that has very high data flow from
> 12:00
> > to
> > > > > 13:00
> > > > > >> > and
> > > > > >> > > > zero
> > > > > >> > > > > > > flow
> > > > > >> > > > > > > >>  otherwise.
> > > > > >> > > > > > > >>  In this case processPartitionData could be
> called
> > > once
> > > > > per
> > > > > >> > > > > 10minutes
> > > > > >> > > > > > > so if
> > > > > >> > > > > > > >>  we start data moving in 13:01 it will be
> finished
> > > next
> > > > > >> day.
> > > > > >> > > > > > > >>
> > > > > >> > > > > > > >>  >
> > > > > >> > > > > > > >>  > But ff the broker is moving a replica for
> which
> > it
> > > > > acts
> > > > > >> as
> > > > > >> > a
> > > > > >> > > > > > leader,
> > > > > >> > > > > > > as
> > > > > >> > > > > > > >>  of
> > > > > >> > > > > > > >>  > current KIP the broker will keep reading from
> > > > > >> log_dir_old
> > > > > >> > and
> > > > > >> > > > > > append
> > > > > >> > > > > > > to
> > > > > >> > > > > > > >>  > log_dir_new without having to wait for
> > > > > round-trip-time.
> > > > > >> We
> > > > > >> > > > > probably
> > > > > >> > > > > > > need
> > > > > >> > > > > > > >>  > quota for this in the future.
> > > > > >> > > > > > > >>  >
> > > > > >> > > > > > > >>  >> >
> > > > > >> > > > > > > >>  >> > And to answer your question, yes
> > > > topicpartition.log
> > > > > >> > refers
> > > > > >> > > > to
> > > > > >> > > > > > > >>  >> > topic-paritition/segment.log.
> > > > > >> > > > > > > >>  >> >
> > > > > >> > > > > > > >>  >> > Thanks,
> > > > > >> > > > > > > >>  >> > Dong
> > > > > >> > > > > > > >>  >> >
> > > > > >> > > > > > > >>  >> > On Fri, Jan 13, 2017 at 4:12 AM, Alexey
> > > > Ozeritsky <
> > > > > >> > > > > > > >>  aozerit...@yandex.ru>
> > > > > >> > > > > > > >>  >> > wrote:
> > > > > >> > > > > > > >>  >> >
> > > > > >> > > > > > > >>  >> >> Hi,
> > > > > >> > > > > > > >>  >> >>
> > > > > >> > > > > > > >>  >> >> We have the similar solution that have
> been
> > > > > working
> > > > > >> in
> > > > > >> > > > > > production
> > > > > >> > > > > > > >>  since
> > > > > >> > > > > > > >>  >> >> 2014. You can see it here:
> > > > > >> > > https://github.com/resetius/ka
> > > > > >> > > > > > > >>  >> >> fka/commit/20658593e246d218490
> > > > > 6879defa2e763c4d413fb
> > > > > >> > > > > > > >>  >> >> The idea is very simple
> > > > > >> > > > > > > >>  >> >> 1. Disk balancer runs in a separate thread
> > > > inside
> > > > > >> > > scheduler
> > > > > >> > > > > > pool.
> > > > > >> > > > > > > >>  >> >> 2. It does not touch empty partitions
> > > > > >> > > > > > > >>  >> >> 3. Before it moves a partition it forcibly
> > > > creates
> > > > > >> new
> > > > > >> > > > > segment
> > > > > >> > > > > > > on a
> > > > > >> > > > > > > >>  >> >> destination disk
> > > > > >> > > > > > > >>  >> >> 4. It moves segment by segment from new to
> > > old.
> > > > > >> > > > > > > >>  >> >> 5. Log class works with segments on both
> > disks
> > > > > >> > > > > > > >>  >> >>
> > > > > >> > > > > > > >>  >> >> Your approach seems too complicated,
> > moreover
> > > it
> > > > > >> means
> > > > > >> > > that
> > > > > >> > > > > you
> > > > > >> > > > > > > >>  have to
> > > > > >> > > > > > > >>  >> >> patch different components of the system
> > > > > >> > > > > > > >>  >> >> Could you clarify what do you mean by
> > > > > >> > topicPartition.log?
> > > > > >> > > > Is
> > > > > >> > > > > it
> > > > > >> > > > > > > >>  >> >> topic-paritition/segment.log ?
> > > > > >> > > > > > > >>  >> >>
> > > > > >> > > > > > > >>  >> >> 12.01.2017, 21:47, "Dong Lin" <
> > > > > lindon...@gmail.com
> > > > > >> >:
> > > > > >> > > > > > > >>  >> >> > Hi all,
> > > > > >> > > > > > > >>  >> >> >
> > > > > >> > > > > > > >>  >> >> > We created KIP-113: Support replicas
> > > movement
> > > > > >> between
> > > > > >> > > log
> > > > > >> > > > > > > >>  >> directories.
> > > > > >> > > > > > > >>  >> >> > Please find the KIP wiki in the link
> > > > > >> > > > > > > >>  >> >> > *https://cwiki.apache.org/conf
> > > > > >> > > > > luence/display/KAFKA/KIP-113%
> > > > > >> > > > > > > >>  >> >> 3A+Support+replicas+movement+b
> > > > > >> etween+log+directories
> > > > > >> > > > > > > >>  >> >> > <https://cwiki.apache.org/conf
> > > > > >> > > > > luence/display/KAFKA/KIP-113%
> > > > > >> > > > > > > >>  >> >> 3A+Support+replicas+movement+
> > > > > >> > between+log+directories>.*
> > > > > >> > > > > > > >>  >> >> >
> > > > > >> > > > > > > >>  >> >> > This KIP is related to KIP-112
> > > > > >> > > > > > > >>  >> >> > <https://cwiki.apache.org/conf
> > > > > >> > > > > luence/display/KAFKA/KIP-112%
> > > > > >> > > > > > > >>  >> >> 3A+Handle+disk+failure+for+JBOD>:
> > > > > >> > > > > > > >>  >> >> > Handle disk failure for JBOD. They are
> > > needed
> > > > in
> > > > > >> > order
> > > > > >> > > to
> > > > > >> > > > > > > support
> > > > > >> > > > > > > >>  >> JBOD in
> > > > > >> > > > > > > >>  >> >> > Kafka. Please help review the KIP. You
> > > > feedback
> > > > > is
> > > > > >> > > > > > appreciated!
> > > > > >> > > > > > > >>  >> >> >
> > > > > >> > > > > > > >>  >> >> > Thanks,
> > > > > >> > > > > > > >>  >> >> > Dong
> > > > > >> > > > > > >
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to