Hey Jun,

After thinking about 14 more, I think your solution is reasonable. I have
updated the KIP to specify that the number of ReplicaMoveThread defaults
to # log dirs.

Thanks!
Dong


On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Thanks for your comment! Please see my reply below.
>
> On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Dong,
>>
>> Thanks for the reply.
>>
>> 10. Could you comment on that?
>>
>
> Sorry, I missed that comment.
>
> Good point. I think the log segments in topicPartition.move directory will
> be subject to log truncation, log retention and log cleaning in the same
> way as the log segments in the source log directory. I just specified this
> inthe KIP.
>
>
>>
>> 11.2 "I am concerned that the ChangeReplicaDirRequest would be lost if
>> broker
>> restarts after it sends ChangeReplicaDirResponse but before it receives
>> LeaderAndIsrRequest."
>>
>> In that case, the reassignment tool could detect that through
>> DescribeDirsRequest
>> and issue ChangeReplicaDirRequest again, right? In the common case, this
>> is
>> probably not needed and we only need to write each replica once.
>>
>> My main concern with the approach in the current KIP is that once a new
>> replica is created in the wrong log dir, the cross log directory movement
>> may not catch up until the new replica is fully bootstrapped. So, we end
>> up
>> writing the data for the same replica twice.
>>
>
> I agree with your concern. My main concern is that it is a bit weird if
> ChangeReplicaDirResponse can not guarantee success and the tool needs to
> rely on DescribeDirResponse to see if it needs to send
> ChangeReplicaDirRequest again.
>
> How about this: If broker doesn't not have already replica created for the
> specified topicParition when it receives ChangeReplicaDirRequest, it will
> reply ReplicaNotAvailableException AND remember (replica, destination log
> directory) pair in memory to create the replica in the specified log
> directory.
>
>
>>
>> 11.3 Are you saying the value in --throttle will be used to set both
>> intra.broker.throttled.rate and leader.follower.replication.
>> throttled.replicas?
>>
>
> No. --throttle will be used to only to set leader.follower.replication as
> it does now. I think we do not need any option in the
> kafka-reassignment-partitions.sh to specify intra.broker.throttled.rate.
> User canset it in broker config or dynamically using kafka-config.sh. Does
> this sound OK?
>
>
>>
>> 12.2 If the user only wants to check one topic, the tool could do the
>> filtering on the client side, right? My concern with having both log_dirs
>> and topics is the semantic. For example, if both are not empty, do we
>> return the intersection or the union?
>>
>
> Yes the tool could filter on the client side. But the purpose of having
> this field is to reduce response side in case broker has a lot of topics.
> The both fields are used as filter and the result is intersection. Do you
> think this semantic is confusing or counter-intuitive?
>
>
>>
>> 12.3. Yes, firstOffset may not be useful.
>>
>> 14. Hmm, I would think moving data across log dirs will be io bound. We
>> also have num.recovery.threads.per.data.dir, which defaults to 1. So,
>> having num.replica.move.threads defaults to # log dirs or half of that (to
>> account for ios on both source and target) seems reasonable. Is a magical
>> value of 3 going to be better? Does that work with only 2 log dirs? There
>> will always be cases when the user needs to customize the value. We just
>> need a reasonable default to cover the common case.
>>
>
> If the throughput of moving data across dir doesn't not increase with
> number of threads, I think we should provide config
> num.replica.move.thread.per.log.dir and give it default value of 1. That
> works in the same way as num.recovery.threads.per.data.dir. But I think
> the replica movement is not necessarily IO bound if broker is using SSD.
> Thus it seems more reasonable to have config num.replica.move.threads that
> is shared across all log directories.
>
> Currently all Kafka configs, including num.recovery.threads.per.data.dir,
> defaults to a constant value instead of relying on values of configs. This
> it will be a bit weird if the config name itself is not per log dir but its
> default value is per dir. And it will also make both code and user
> documentation a bit more complicated because currently all configs,
> including num.recovery.threads.per.data.dir, defaults to a constant
> value. The advantage of using a magic value is simplicity. To answer your
> question, I think 3 ReplicaMoveThreads can work with more than 2 log
> directories. Say there are 3 ReplicaMoveThreads and 4 log directories, each
> ReplicaMoveThread will check if there is any replica waiting for movement,
> finish movement of this replica, and check again. Is there any concern with
> this approach?
>
> I have chosen the magic value 3 because current default number of network
> threads is 3. We can also set it to 8 which is the default number of io
> threads. Would there be any performance concern with using 8 threads by
> default?
>
>
>
>>
>> 20. Should we support canceling the movement across log dirs? I was
>> thinking this can be achieved with a ChangeReplicaDirRequest with dir =
>> any.
>>
>
> As of current KIP user can cancel movement across log directories by first
> sending DescribeDirsRequest, figure out the source directory of those
> replicas that are being moved, and then send ChangeReplicaDirRequest to
> move replica to the source log directory. But "any" seems like an easier
> and reasonable approach to cancel replica movement. I just added it to the
> KIP.
>
>
>>
>> Jun
>>
>>
>> On Mon, Mar 13, 2017 at 3:32 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > Hey Jun,
>> >
>> > Thanks much for your detailed comments. Please see my reply below.
>> >
>> > On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao <j...@confluent.io> wrote:
>> >
>> > > Hi, Dong,
>> > >
>> > > Thanks for the updated KIP. Some more comments below.
>> > >
>> > > 10. For the .move log, do we perform any segment deletion (based on
>> > > retention) or log cleaning (if a compacted topic)? Or do we only
>> enable
>> > > that after the swap?
>> > >
>> > > 11. kafka-reassign-partitions.sh
>> > > 11.1 If all reassigned replicas are in the current broker and only the
>> > log
>> > > directories have changed, we can probably optimize the tool to not
>> > trigger
>> > > partition reassignment through the controller and only
>> > > send ChangeReplicaDirRequest.
>> > >
>> >
>> > Yes, the reassignment script should not create the reassignment znode
>> if no
>> > replicas are not be moved between brokers. This falls into the "How to
>> move
>> > replica between log directories on the same broker" of the Proposed
>> Change
>> > section.
>> >
>> >
>> > > 11.2 If ChangeReplicaDirRequest specifies a replica that's not created
>> > yet,
>> > > could the broker just remember that in memory and create the replica
>> when
>> > > the creation is requested? This way, when doing cluster expansion, we
>> can
>> > > make sure that the new replicas on the new brokers are created in the
>> > right
>> > > log directory in the first place. We can also avoid the tool having to
>> > keep
>> > > issuing ChangeReplicaDirRequest in response to
>> > > ReplicaNotAvailableException.
>> > >
>> >
>> > I am concerned that the ChangeReplicaDirRequest would be lost if broker
>> > restarts after it sends ChangeReplicaDirResponse but before it receives
>> > LeaderAndIsrRequest. In this case, the user will receive success when
>> they
>> > initiate replica reassignment, but replica reassignment will never
>> complete
>> > when they verify the reassignment later. This would be confusing to
>> user.
>> >
>> > There are three different approaches to this problem if broker has not
>> > created replica yet after it receives ChangeReplicaDirResquest:
>> >
>> > 1) Broker immediately replies to user with ReplicaNotAvailableException
>> and
>> > user can decide to retry again later. The advantage of this solution is
>> > that the broker logic is very simple and the reassignment script logic
>> also
>> > seems straightforward. The disadvantage is that user script has to
>> retry.
>> > But it seems fine - we can set interval between retries to be 0.5 sec so
>> > that broker want be bombarded by those requests. This is the solution
>> > chosen in the current KIP.
>> >
>> > 2) Broker can put ChangeReplicaDirRequest in a purgatory with timeout
>> and
>> > replies to user after the replica has been created. I didn't choose
>> this in
>> > the interest of keeping broker logic simpler.
>> >
>> > 3) Broker can remember that by making a mark in the disk, e.g. create
>> > topicPartition.tomove directory in the destination log directory. This
>> mark
>> > will be persisted across broker restart. This is the first idea I had
>> but I
>> > replaced it with solution 1) in the interest of keeping broker simple.
>> >
>> > It seems that solution 1) is the simplest one that works. But I am OK to
>> > switch to the other two solutions if we don't want the retry logic.
>> What do
>> > you think?
>> >
>> >
>> > 11.3 Do we need an option in the tool to specify intra.broker.
>> > > throttled.rate?
>> > >
>> >
>> > I don't find it useful to add this option to
>> kafka-reassign-partitions.sh.
>> > The reason we have the option "--throttle" in the script to throttle
>> > replication rate is that we usually want higher quota to fix an offline
>> > replica to get out of URP. But we are OK to have a lower quota if we are
>> > moving replica only to balance the cluster. Thus it is common for SRE to
>> > use different quota when using kafka-reassign-partitions.sh to move
>> replica
>> > between brokers.
>> >
>> > However, the only reason for moving replica between log directories of
>> the
>> > same broker is to balance cluster resource. Thus the option to
>> > specify intra.broker.throttled.rate in the tool is not that useful. I am
>> > inclined not to add this option to keep this tool's usage simpler.
>> >
>> >
>> > >
>> > > 12. DescribeDirsRequest
>> > > 12.1 In other requests like CreateTopicRequest, we return an empty
>> list
>> > in
>> > > the response for an empty input list. If the input list is null, we
>> > return
>> > > everything. We should probably follow the same convention here.
>> > >
>> >
>> > Thanks. I wasn't aware of this convention. I have change
>> > DescribeDirsRequest so that "null" indicates "all".
>> >
>> >
>> > > 12.2 Do we need the topics field? Since the request is about log
>> dirs, it
>> > > makes sense to specify the log dirs. But it's weird to specify topics.
>> > >
>> >
>> > The topics field is not necessary. But it is useful to reduce the
>> response
>> > size in case user are only interested in the status of a few topics. For
>> > example, user may have initiated the reassignment of a given replica
>> from
>> > one log directory to another log directory on the same broker, and the
>> user
>> > only wants to check the status of this given partition by looking
>> > at DescribeDirsResponse. Thus this field is useful.
>> >
>> > I am not sure if it is weird to call this request DescribeDirsRequest.
>> The
>> > response is a map from log directory to information to some partitions
>> on
>> > the log directory. Do you think we need to change the name of the
>> request?
>> >
>> >
>> > > 12.3 DescribeDirsResponsePartition: Should we include firstOffset and
>> > > nextOffset in the response? That could be useful to track the
>> progress of
>> > > the movement.
>> > >
>> >
>> > Yeah good point. I agree it is useful to include logEndOffset in the
>> > response. According to Log.scala doc the logEndOffset is equivalent to
>> the
>> > nextOffset. User can track progress by checking the difference between
>> > logEndOffset of the given partition in the source and destination log
>> > directories. I have added logEndOffset to the
>> DescribeDirsResponsePartition
>> > in the KIP.
>> >
>> > But it seems that we don't need firstOffset in the response. Do you
>> think
>> > firstOffset is still needed?
>> >
>> >
>> > >
>> > > 13. ChangeReplicaDirResponse: Do we need error code at both levels?
>> > >
>> >
>> > My bad. It is not needed. I have removed request level error code. I
>> also
>> > added ChangeReplicaDirRequestTopic and ChangeReplicaDirResponseTopic to
>> > reduce duplication of the "topic" string in the request and response.
>> >
>> >
>> > >
>> > > 14. num.replica.move.threads: Does it default to # log dirs?
>> > >
>> >
>> > No. It doesn't. I expect default number to be set to a conservative
>> value
>> > such as 3. It may be surprising to user if the number of threads
>> increase
>> > just because they have assigned more log directories to Kafka broker.
>> >
>> > It seems that the number of replica move threads doesn't have to depend
>> on
>> > the number of log directories. It is possible to have one thread that
>> moves
>> > replicas across all log directories. On the other hand we can have
>> multiple
>> > threads to move replicas to the same log directory. For example, if
>> broker
>> > uses SSD, the CPU instead of disk IO may be the replica move bottleneck
>> and
>> > it will be faster to move replicas using multiple threads per log
>> > directory.
>> >
>> >
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > > On Thu, Mar 9, 2017 at 7:04 PM, Dong Lin <lindon...@gmail.com> wrote:
>> > >
>> > > > I just made one correction in the KIP. If broker receives
>> > > > ChangeReplicaDirRequest and the replica hasn't been created there,
>> the
>> > > > broker will respond ReplicaNotAvailableException.
>> > > > The kafka-reassignemnt-partitions.sh will need to re-send
>> > > > ChangeReplicaDirRequest in this case in order to wait for
>> controller to
>> > > > send LeaderAndIsrRequest to broker. The previous approach of
>> creating
>> > an
>> > > > empty directory seems hacky.
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hey Jun,
>> > > > >
>> > > > > Thanks for your comments! I have updated the KIP to address your
>> > > > comments.
>> > > > > Please see my reply inline.
>> > > > >
>> > > > > Can you let me know if the latest KIP has addressed your comments?
>> > > > >
>> > > > > On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao <j...@confluent.io> wrote:
>> > > > >
>> > > > >> Hi, Dong,
>> > > > >>
>> > > > >> Thanks for the reply.
>> > > > >>
>> > > > >> 1.3 So the thread gets the lock, checks if caught up and releases
>> > the
>> > > > lock
>> > > > >> if not? Then, in the case when there is continuous incoming data,
>> > the
>> > > > >> thread may never get a chance to swap. One way to address this is
>> > when
>> > > > the
>> > > > >> thread is getting really close in catching up, just hold onto the
>> > lock
>> > > > >> until the thread fully catches up.
>> > > > >>
>> > > > >
>> > > > > Yes, that was my original solution. I see your point that the lock
>> > may
>> > > > not
>> > > > > be fairly assigned to ReplicaMoveThread and RequestHandlerThread
>> when
>> > > > there
>> > > > > is frequent incoming requets. You solution should address the
>> problem
>> > > > and I
>> > > > > have updated the KIP to use it.
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> 2.3 So, you are saying that the partition reassignment tool can
>> > first
>> > > > send
>> > > > >> a ChangeReplicaDirRequest to relevant brokers to establish the
>> log
>> > dir
>> > > > for
>> > > > >> replicas not created yet, then trigger the partition movement
>> across
>> > > > >> brokers through the controller? That's actually a good idea.
>> Then,
>> > we
>> > > > can
>> > > > >> just leave LeaderAndIsrRequest as it is.
>> > > > >
>> > > > >
>> > > > > Yes, that is what I plan to do. If broker receives a
>> > > > > ChangeReplicaDirRequest while it is not leader or follower of the
>> > > > > partition, the broker will create an empty Log instance (i.e. a
>> > > directory
>> > > > > named topicPartition) in the destination log directory so that the
>> > > > replica
>> > > > > will be placed there when broker receives LeaderAndIsrRequest from
>> > the
>> > > > > broker. The broker should clean up empty those Log instances on
>> > startup
>> > > > > just in case a ChangeReplicaDirRequest was mistakenly sent to a
>> > broker
>> > > > that
>> > > > > was not meant to be follower/leader of the partition..
>> > > > >
>> > > > >
>> > > > >> Another thing related to
>> > > > >> ChangeReplicaDirRequest.
>> > > > >> Since this request may take long to complete, I am not sure if we
>> > > should
>> > > > >> wait for the movement to complete before respond. While waiting
>> for
>> > > the
>> > > > >> movement to complete, the idle connection may be killed or the
>> > client
>> > > > may
>> > > > >> be gone already. An alternative is to return immediately and add
>> a
>> > new
>> > > > >> request like CheckReplicaDirRequest to see if the movement has
>> > > > completed.
>> > > > >> The tool can take advantage of that to check the status.
>> > > > >>
>> > > > >
>> > > > > I agree with your concern and solution. We need request to query
>> the
>> > > > > partition -> log_directory mapping on the broker. I have updated
>> the
>> > > KIP
>> > > > to
>> > > > > remove need for ChangeReplicaDirRequestPurgatory.
>> > > > > Instead, kafka-reassignemnt-partitions.sh will send
>> > > DescribeDirsRequest
>> > > > > to brokers when user wants to verify the partition assignment.
>> Since
>> > we
>> > > > > need this DescribeDirsRequest anyway, we can also use this
>> request to
>> > > > > expose stats like the individual log size instead of using JMX.
>> One
>> > > > > drawback of using JMX is that user has to manage the JMX port and
>> > > related
>> > > > > credentials if they haven't already done this, which is the case
>> at
>> > > > > LinkedIn.
>> > > > >
>> > > > >
>> > > > >> Thanks,
>> > > > >>
>> > > > >> Jun
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >> On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin <lindon...@gmail.com>
>> > wrote:
>> > > > >>
>> > > > >> > Hey Jun,
>> > > > >> >
>> > > > >> > Thanks for the detailed explanation. I will use the separate
>> > thread
>> > > > >> pool to
>> > > > >> > move replica between log directories. I will let you know when
>> the
>> > > KIP
>> > > > >> has
>> > > > >> > been updated to use a separate thread pool.
>> > > > >> >
>> > > > >> > Here is my response to your other questions:
>> > > > >> >
>> > > > >> > 1.3 My idea is that the ReplicaMoveThread that moves data
>> should
>> > get
>> > > > the
>> > > > >> > lock before checking whether the replica in the destination log
>> > > > >> directory
>> > > > >> > has caught up. If the new replica has caught up, then the
>> > > > >> ReplicaMoveThread
>> > > > >> > should swaps the replica while it is still holding the lock.
>> The
>> > > > >> > ReplicaFetcherThread or RequestHandlerThread will not be able
>> to
>> > > > append
>> > > > >> > data to the replica in the source replica during this period
>> > because
>> > > > >> they
>> > > > >> > can not get the lock. Does this address the problem?
>> > > > >> >
>> > > > >> > 2.3 I get your point that we want to keep controller simpler.
>> If
>> > > admin
>> > > > >> tool
>> > > > >> > can send ChangeReplicaDirRequest to move data within a broker,
>> > then
>> > > > >> > controller probably doesn't even need to include log directory
>> > path
>> > > in
>> > > > >> the
>> > > > >> > LeaderAndIsrRequest. How about this: controller will only deal
>> > with
>> > > > >> > reassignment across brokers as it does now. If user specified
>> > > > >> destination
>> > > > >> > replica for any disk, the admin tool will send
>> > > ChangeReplicaDirRequest
>> > > > >> and
>> > > > >> > wait for response from broker to confirm that all replicas have
>> > been
>> > > > >> moved
>> > > > >> > to the destination log direcotry. The broker will put
>> > > > >> > ChangeReplicaDirRequset in a purgatory and respond either when
>> the
>> > > > >> movement
>> > > > >> > is completed or when the request has timed-out.
>> > > > >> >
>> > > > >> > 4. I agree that we can expose these metrics via JMX. But I am
>> not
>> > > sure
>> > > > >> if
>> > > > >> > it can be obtained easily with good performance using either
>> > > existing
>> > > > >> tools
>> > > > >> > or new script in kafka. I will ask SREs for their opinion.
>> > > > >> >
>> > > > >> > Thanks,
>> > > > >> > Dong
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> > On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao <j...@confluent.io>
>> wrote:
>> > > > >> >
>> > > > >> > > Hi, Dong,
>> > > > >> > >
>> > > > >> > > Thanks for the updated KIP. A few more comments below.
>> > > > >> > >
>> > > > >> > > 1.1 and 1.2: I am still not sure there is enough benefit of
>> > > reusing
>> > > > >> > > ReplicaFetchThread
>> > > > >> > > to move data across disks.
>> > > > >> > > (a) A big part of ReplicaFetchThread is to deal with issuing
>> and
>> > > > >> tracking
>> > > > >> > > fetch requests. So, it doesn't feel that we get much from
>> > reusing
>> > > > >> > > ReplicaFetchThread
>> > > > >> > > only to disable the fetching part.
>> > > > >> > > (b) The leader replica has no ReplicaFetchThread to start
>> with.
>> > It
>> > > > >> feels
>> > > > >> > > weird to start one just for intra broker data movement.
>> > > > >> > > (c) The ReplicaFetchThread is per broker. Intuitively, the
>> > number
>> > > of
>> > > > >> > > threads doing intra broker data movement should be related to
>> > the
>> > > > >> number
>> > > > >> > of
>> > > > >> > > disks in the broker, not the number of brokers in the
>> cluster.
>> > > > >> > > (d) If the destination disk fails, we want to stop the intra
>> > > broker
>> > > > >> data
>> > > > >> > > movement, but want to continue inter broker replication. So,
>> > > > >> logically,
>> > > > >> > it
>> > > > >> > > seems it's better to separate out the two.
>> > > > >> > > (e) I am also not sure if we should reuse the existing
>> > throttling
>> > > > for
>> > > > >> > > replication. It's designed to handle traffic across brokers
>> and
>> > > the
>> > > > >> > > delaying is done in the fetch request. So, if we are not
>> doing
>> > > > >> > > fetching in ReplicaFetchThread,
>> > > > >> > > I am not sure the existing throttling is effective. Also,
>> when
>> > > > >> specifying
>> > > > >> > > the throttling of moving data across disks, it seems the user
>> > > > >> shouldn't
>> > > > >> > > care about whether a replica is a leader or a follower.
>> Reusing
>> > > the
>> > > > >> > > existing throttling config name will be awkward in this
>> regard.
>> > > > >> > > (f) It seems it's simpler and more consistent to use a
>> separate
>> > > > thread
>> > > > >> > pool
>> > > > >> > > for local data movement (for both leader and follower
>> replicas).
>> > > > This
>> > > > >> > > process can then be configured (e.g. number of threads, etc)
>> and
>> > > > >> > throttled
>> > > > >> > > independently.
>> > > > >> > >
>> > > > >> > > 1.3 Yes, we will need some synchronization there. So, if the
>> > > > movement
>> > > > >> > > thread catches up, gets the lock to do the swap, but realizes
>> > that
>> > > > new
>> > > > >> > data
>> > > > >> > > is added, it has to continue catching up while holding the
>> lock?
>> > > > >> > >
>> > > > >> > > 2.3 The benefit of including the desired log directory in
>> > > > >> > > LeaderAndIsrRequest
>> > > > >> > > during partition reassignment is that the controller doesn't
>> > need
>> > > to
>> > > > >> > track
>> > > > >> > > the progress for disk movement. So, you don't need the
>> > additional
>> > > > >> > > BrokerDirStateUpdateRequest. Then the controller never needs
>> to
>> > > > issue
>> > > > >> > > ChangeReplicaDirRequest.
>> > > > >> > > Only the admin tool will issue ChangeReplicaDirRequest to
>> move
>> > > data
>> > > > >> > within
>> > > > >> > > a broker. I agree that this makes LeaderAndIsrRequest more
>> > > > >> complicated,
>> > > > >> > but
>> > > > >> > > that seems simpler than changing the controller to track
>> > > additional
>> > > > >> > states
>> > > > >> > > during partition reassignment.
>> > > > >> > >
>> > > > >> > > 4. We want to make a decision on how to expose the stats. So
>> > far,
>> > > we
>> > > > >> are
>> > > > >> > > exposing stats like the individual log size as JMX. So, one
>> way
>> > is
>> > > > to
>> > > > >> > just
>> > > > >> > > add new jmx to expose the log directory of individual
>> replicas.
>> > > > >> > >
>> > > > >> > > Thanks,
>> > > > >> > >
>> > > > >> > > Jun
>> > > > >> > >
>> > > > >> > >
>> > > > >> > > On Thu, Mar 2, 2017 at 11:18 PM, Dong Lin <
>> lindon...@gmail.com>
>> > > > >> wrote:
>> > > > >> > >
>> > > > >> > > > Hey Jun,
>> > > > >> > > >
>> > > > >> > > > Thanks for all the comments! Please see my answer below. I
>> > have
>> > > > >> updated
>> > > > >> > > the
>> > > > >> > > > KIP to address most of the questions and make the KIP
>> easier
>> > to
>> > > > >> > > understand.
>> > > > >> > > >
>> > > > >> > > > Thanks,
>> > > > >> > > > Dong
>> > > > >> > > >
>> > > > >> > > > On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao <j...@confluent.io>
>> > > wrote:
>> > > > >> > > >
>> > > > >> > > > > Hi, Dong,
>> > > > >> > > > >
>> > > > >> > > > > Thanks for the KIP. A few comments below.
>> > > > >> > > > >
>> > > > >> > > > > 1. For moving data across directories
>> > > > >> > > > > 1.1 I am not sure why we want to use
>> ReplicaFetcherThread to
>> > > > move
>> > > > >> > data
>> > > > >> > > > > around in the leader. ReplicaFetchThread fetches data
>> from
>> > > > socket.
>> > > > >> > For
>> > > > >> > > > > moving data locally, it seems that we want to avoid the
>> > socket
>> > > > >> > > overhead.
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > > > The purpose of using ReplicaFetchThread is to re-use
>> existing
>> > > > thread
>> > > > >> > > > instead of creating more threads and make our thread model
>> > more
>> > > > >> > complex.
>> > > > >> > > It
>> > > > >> > > > seems like a nature choice for copying data between disks
>> > since
>> > > it
>> > > > >> is
>> > > > >> > > > similar to copying data between brokers. Another reason is
>> > that
>> > > if
>> > > > >> the
>> > > > >> > > > replica to be moved is a follower, we don't need lock to
>> swap
>> > > > >> replicas
>> > > > >> > > when
>> > > > >> > > > destination replica has caught up, since the same thread
>> which
>> > > is
>> > > > >> > > fetching
>> > > > >> > > > data from leader will swap the replica.
>> > > > >> > > >
>> > > > >> > > > The ReplicaFetchThread will not incur socket overhead while
>> > > > copying
>> > > > >> > data
>> > > > >> > > > between disks. It will read directly from source disk (as
>> we
>> > do
>> > > > when
>> > > > >> > > > processing FetchRequest) and write to destination disk (as
>> we
>> > do
>> > > > >> when
>> > > > >> > > > processing ProduceRequest).
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > > 1.2 I am also not sure about moving data in the
>> > > > >> ReplicaFetcherThread
>> > > > >> > in
>> > > > >> > > > the
>> > > > >> > > > > follower. For example, I am not sure setting
>> > > > >> replica.fetch.max.wait
>> > > > >> > to
>> > > > >> > > 0
>> > > > >> > > > >  is ideal. It may not always be effective since a fetch
>> > > request
>> > > > in
>> > > > >> > the
>> > > > >> > > > > ReplicaFetcherThread could be arbitrarily delayed due to
>> > > > >> replication
>> > > > >> > > > > throttling on the leader. In general, the data movement
>> > logic
>> > > > >> across
>> > > > >> > > > disks
>> > > > >> > > > > seems different from that in ReplicaFetcherThread. So, I
>> am
>> > > not
>> > > > >> sure
>> > > > >> > > why
>> > > > >> > > > > they need to be coupled.
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > > > While it may not be the most efficient way to copy data
>> > between
>> > > > >> local
>> > > > >> > > > disks, it will be at least as efficient as copying data
>> from
>> > > > leader
>> > > > >> to
>> > > > >> > > the
>> > > > >> > > > destination disk. The expected goal of KIP-113 is to enable
>> > data
>> > > > >> > movement
>> > > > >> > > > between disks with no less efficiency than what we do now
>> when
>> > > > >> moving
>> > > > >> > > data
>> > > > >> > > > between brokers. I think we can optimize its performance
>> using
>> > > > >> separate
>> > > > >> > > > thread if the performance is not good enough.
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > > 1.3 Could you add a bit more details on how we swap the
>> > > replicas
>> > > > >> when
>> > > > >> > > the
>> > > > >> > > > > new ones are fully caught up? For example, what happens
>> when
>> > > the
>> > > > >> new
>> > > > >> > > > > replica in the new log directory is caught up, but when
>> we
>> > > want
>> > > > >> to do
>> > > > >> > > the
>> > > > >> > > > > swap, some new data has arrived?
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > > > If the replica is a leader, then ReplicaFetcherThread will
>> > > perform
>> > > > >> the
>> > > > >> > > > replacement. Proper lock is needed to prevent
>> > > KafkaRequestHandler
>> > > > >> from
>> > > > >> > > > appending data to the topicPartition.log on the source
>> disks
>> > > > before
>> > > > >> > this
>> > > > >> > > > replacement is completed by ReplicaFetcherThread.
>> > > > >> > > >
>> > > > >> > > > If the replica is a follower, because the same
>> > > ReplicaFetchThread
>> > > > >> which
>> > > > >> > > > fetches data from leader will also swap the replica , no
>> lock
>> > is
>> > > > >> > needed.
>> > > > >> > > >
>> > > > >> > > > I have updated the KIP to specify both more explicitly.
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > > 1.4 Do we need to do the .move at the log segment level
>> or
>> > > could
>> > > > >> we
>> > > > >> > > just
>> > > > >> > > > do
>> > > > >> > > > > that at the replica directory level? Renaming just a
>> > directory
>> > > > is
>> > > > >> > much
>> > > > >> > > > > faster than renaming the log segments.
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > > > Great point. I have updated the KIP to rename the log
>> > directory
>> > > > >> > instead.
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > > 1.5 Could you also describe a bit what happens when
>> either
>> > the
>> > > > >> source
>> > > > >> > > or
>> > > > >> > > > > the target log directory fails while the data moving is
>> in
>> > > > >> progress?
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > > > If source log directory fails, then the replica movement
>> will
>> > > stop
>> > > > >> and
>> > > > >> > > the
>> > > > >> > > > source replica is marked offline. If destination log
>> directory
>> > > > >> fails,
>> > > > >> > > then
>> > > > >> > > > the replica movement will stop. I have updated the KIP to
>> > > clarify
>> > > > >> this.
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > >
>> > > > >> > > > > 2. For partition reassignment.
>> > > > >> > > > > 2.1 I am not sure if the controller can block on
>> > > > >> > > ChangeReplicaDirRequest.
>> > > > >> > > > > Data movement may take a long time to complete. If there
>> is
>> > an
>> > > > >> > > > outstanding
>> > > > >> > > > > request from the controller to a broker, that broker
>> won't
>> > be
>> > > > >> able to
>> > > > >> > > > > process any new request from the controller. So if
>> another
>> > > event
>> > > > >> > (e.g.
>> > > > >> > > > > broker failure) happens when the data movement is in
>> > progress,
>> > > > >> > > subsequent
>> > > > >> > > > > LeaderAnIsrRequest will be delayed.
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > > > Yeah good point. I missed the fact that there is be only
>> one
>> > > > >> inflight
>> > > > >> > > > request from controller to broker.
>> > > > >> > > >
>> > > > >> > > > How about I add a request, e.g.
>> BrokerDirStateUpdateRequest,
>> > > which
>> > > > >> maps
>> > > > >> > > > topicPartition to log directory and can be sent from
>> broker to
>> > > > >> > controller
>> > > > >> > > > to indicate completion?
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > > 2.2 in the KIP, the partition reassignment tool is also
>> used
>> > > for
>> > > > >> > cases
>> > > > >> > > > > where an admin just wants to balance the existing data
>> > across
>> > > > log
>> > > > >> > > > > directories in the broker. In this case, it seems that
>> it's
>> > > over
>> > > > >> > > killing
>> > > > >> > > > to
>> > > > >> > > > > have the process go through the controller. A simpler
>> > approach
>> > > > is
>> > > > >> to
>> > > > >> > > > issue
>> > > > >> > > > > an RPC request to the broker directly.
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > > > I agree we can optimize this case. It is just that we have
>> to
>> > > add
>> > > > >> new
>> > > > >> > > logic
>> > > > >> > > > or code path to handle a scenario that is already covered
>> by
>> > the
>> > > > >> more
>> > > > >> > > > complicated scenario. I will add it to the KIP.
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > > 2.3 When using the partition reassignment tool to move
>> > > replicas
>> > > > >> > across
>> > > > >> > > > > brokers, it make sense to be able to specify the log
>> > directory
>> > > > of
>> > > > >> the
>> > > > >> > > > newly
>> > > > >> > > > > created replicas. The KIP does that in two separate
>> requests
>> > > > >> > > > > ChangeReplicaDirRequest and LeaderAndIsrRequest, and
>> tracks
>> > > the
>> > > > >> > > progress
>> > > > >> > > > of
>> > > > >> > > > > each independently. An alternative is to do that just in
>> > > > >> > > > > LeaderAndIsrRequest.
>> > > > >> > > > > That way, the new replicas will be created in the right
>> log
>> > > dir
>> > > > in
>> > > > >> > the
>> > > > >> > > > > first place and the controller just needs to track the
>> > > progress
>> > > > of
>> > > > >> > > > > partition reassignment in the current way.
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > > > I agree it is better to use one request instead of two to
>> > > request
>> > > > >> > replica
>> > > > >> > > > movement between disks. But I think the performance
>> advantage
>> > of
>> > > > >> doing
>> > > > >> > so
>> > > > >> > > > is negligible because we trigger replica assignment much
>> less
>> > > than
>> > > > >> all
>> > > > >> > > > other kinds of events in the Kafka cluster. I am not sure
>> that
>> > > the
>> > > > >> > > benefit
>> > > > >> > > > of doing this is worth the effort to add an optional string
>> > > field
>> > > > in
>> > > > >> > the
>> > > > >> > > > LeaderAndIsrRequest. Also if we add this optional field in
>> the
>> > > > >> > > > LeaderAndIsrRequest, we probably want to remove
>> > > > >> ChangeReplicaDirRequest
>> > > > >> > > to
>> > > > >> > > > avoid having two requests doing the same thing. But it
>> means
>> > > user
>> > > > >> > script
>> > > > >> > > > can not send request directly to the broker to trigger
>> replica
>> > > > >> movement
>> > > > >> > > > between log directories.
>> > > > >> > > >
>> > > > >> > > > I will do it if you are strong about this optimzation.
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > >
>> > > > >> > > > > 3. /admin/reassign_partitions: Including the log dir in
>> > every
>> > > > >> replica
>> > > > >> > > may
>> > > > >> > > > > not be efficient. We could include a list of log
>> directories
>> > > and
>> > > > >> > > > reference
>> > > > >> > > > > the index of the log directory in each replica.
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > > > Good point. I have updated the KIP to use this solution.
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > >
>> > > > >> > > > > 4. DescribeDirsRequest: The stats in the request are
>> already
>> > > > >> > available
>> > > > >> > > > from
>> > > > >> > > > > JMX. Do we need the new request?
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > > > Does JMX also include the state (i.e. offline or online) of
>> > each
>> > > > log
>> > > > >> > > > directory and the log directory of each replica? If not,
>> then
>> > > > maybe
>> > > > >> we
>> > > > >> > > > still need DescribeDirsRequest?
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > >
>> > > > >> > > > > 5. We want to be consistent on ChangeReplicaDirRequest vs
>> > > > >> > > > > ChangeReplicaRequest.
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > > > I think ChangeReplicaRequest and ChangeReplicaResponse is
>> my
>> > > typo.
>> > > > >> > Sorry,
>> > > > >> > > > they are fixed now.
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > >
>> > > > >> > > > > Thanks,
>> > > > >> > > > >
>> > > > >> > > > > Jun
>> > > > >> > > > >
>> > > > >> > > > >
>> > > > >> > > > > On Fri, Feb 3, 2017 at 6:19 PM, Dong Lin <
>> > lindon...@gmail.com
>> > > >
>> > > > >> > wrote:
>> > > > >> > > > >
>> > > > >> > > > > > Hey ALexey,
>> > > > >> > > > > >
>> > > > >> > > > > > Thanks for all the comments!
>> > > > >> > > > > >
>> > > > >> > > > > > I have updated the KIP to specify how we enforce
>> quota. I
>> > > also
>> > > > >> > > updated
>> > > > >> > > > > the
>> > > > >> > > > > > "The thread model and broker logic for moving replica
>> data
>> > > > >> between
>> > > > >> > > log
>> > > > >> > > > > > directories" to make it easier to read. You can find
>> the
>> > > exact
>> > > > >> > change
>> > > > >> > > > > here
>> > > > >> > > > > > <https://cwiki.apache.org/conf
>> > > luence/pages/diffpagesbyversio
>> > > > >> > > > > > n.action?pageId=67638408&selec
>> > > tedPageVersions=5&selectedPage
>> > > > >> > > > Versions=6>.
>> > > > >> > > > > > The idea is to use the same replication quota mechanism
>> > > > >> introduced
>> > > > >> > in
>> > > > >> > > > > > KIP-73.
>> > > > >> > > > > >
>> > > > >> > > > > > Thanks,
>> > > > >> > > > > > Dong
>> > > > >> > > > > >
>> > > > >> > > > > >
>> > > > >> > > > > >
>> > > > >> > > > > > On Wed, Feb 1, 2017 at 2:16 AM, Alexey Ozeritsky <
>> > > > >> > > aozerit...@yandex.ru
>> > > > >> > > > >
>> > > > >> > > > > > wrote:
>> > > > >> > > > > >
>> > > > >> > > > > > >
>> > > > >> > > > > > >
>> > > > >> > > > > > > 24.01.2017, 22:03, "Dong Lin" <lindon...@gmail.com>:
>> > > > >> > > > > > > > Hey Alexey,
>> > > > >> > > > > > > >
>> > > > >> > > > > > > > Thanks. I think we agreed that the suggested
>> solution
>> > > > >> doesn't
>> > > > >> > > work
>> > > > >> > > > in
>> > > > >> > > > > > > > general for kafka users. To answer your questions:
>> > > > >> > > > > > > >
>> > > > >> > > > > > > > 1. I agree we need quota to rate limit replica
>> > movement
>> > > > >> when a
>> > > > >> > > > broker
>> > > > >> > > > > > is
>> > > > >> > > > > > > > moving a "leader" replica. I will come up with
>> > solution,
>> > > > >> > probably
>> > > > >> > > > > > re-use
>> > > > >> > > > > > > > the config of replication quota introduced in
>> KIP-73.
>> > > > >> > > > > > > >
>> > > > >> > > > > > > > 2. Good point. I agree that this is a problem in
>> > > general.
>> > > > >> If is
>> > > > >> > > no
>> > > > >> > > > > new
>> > > > >> > > > > > > data
>> > > > >> > > > > > > > on that broker, with current default value of
>> > > > >> > > > > > replica.fetch.wait.max.ms
>> > > > >> > > > > > > > and replica.fetch.max.bytes, the replica will be
>> moved
>> > > at
>> > > > >> only
>> > > > >> > 2
>> > > > >> > > > MBps
>> > > > >> > > > > > > > throughput. I think the solution is for broker to
>> set
>> > > > >> > > > > > > > replica.fetch.wait.max.ms to 0 in its
>> FetchRequest if
>> > > the
>> > > > >> > > > > > corresponding
>> > > > >> > > > > > > > ReplicaFetcherThread needs to move some replica to
>> > > another
>> > > > >> > disk.
>> > > > >> > > > > > > >
>> > > > >> > > > > > > > 3. I have updated the KIP to mention that the read
>> > size
>> > > > of a
>> > > > >> > > given
>> > > > >> > > > > > > > partition is configured using
>> replica.fetch.max.bytes
>> > > when
>> > > > >> we
>> > > > >> > > move
>> > > > >> > > > > > > replicas
>> > > > >> > > > > > > > between disks.
>> > > > >> > > > > > > >
>> > > > >> > > > > > > > Please see this
>> > > > >> > > > > > > > <https://cwiki.apache.org/conf
>> > > > >> luence/pages/diffpagesbyversio
>> > > > >> > > > n.action
>> > > > >> > > > > ?
>> > > > >> > > > > > > pageId=67638408&selectedPageVe
>> > > > rsions=4&selectedPageVersions=
>> > > > >> 5>
>> > > > >> > > > > > > > for the change of the KIP. I will come up with a
>> > > solution
>> > > > to
>> > > > >> > > > throttle
>> > > > >> > > > > > > > replica movement when a broker is moving a "leader"
>> > > > replica.
>> > > > >> > > > > > >
>> > > > >> > > > > > > Thanks. It looks great.
>> > > > >> > > > > > >
>> > > > >> > > > > > > >
>> > > > >> > > > > > > > On Tue, Jan 24, 2017 at 3:30 AM, Alexey Ozeritsky <
>> > > > >> > > > > > aozerit...@yandex.ru>
>> > > > >> > > > > > > > wrote:
>> > > > >> > > > > > > >
>> > > > >> > > > > > > >>  23.01.2017, 22:11, "Dong Lin" <
>> lindon...@gmail.com
>> > >:
>> > > > >> > > > > > > >>  > Thanks. Please see my comment inline.
>> > > > >> > > > > > > >>  >
>> > > > >> > > > > > > >>  > On Mon, Jan 23, 2017 at 6:45 AM, Alexey
>> Ozeritsky
>> > <
>> > > > >> > > > > > > aozerit...@yandex.ru>
>> > > > >> > > > > > > >>  > wrote:
>> > > > >> > > > > > > >>  >
>> > > > >> > > > > > > >>  >> 13.01.2017, 22:29, "Dong Lin" <
>> > lindon...@gmail.com
>> > > >:
>> > > > >> > > > > > > >>  >> > Hey Alexey,
>> > > > >> > > > > > > >>  >> >
>> > > > >> > > > > > > >>  >> > Thanks for your review and the alternative
>> > > > approach.
>> > > > >> > Here
>> > > > >> > > is
>> > > > >> > > > > my
>> > > > >> > > > > > > >>  >> > understanding of your patch. kafka's
>> background
>> > > > >> threads
>> > > > >> > > are
>> > > > >> > > > > used
>> > > > >> > > > > > > to
>> > > > >> > > > > > > >>  move
>> > > > >> > > > > > > >>  >> > data between replicas. When data movement is
>> > > > >> triggered,
>> > > > >> > > the
>> > > > >> > > > > log
>> > > > >> > > > > > > will
>> > > > >> > > > > > > >>  be
>> > > > >> > > > > > > >>  >> > rolled and the new logs will be put in the
>> new
>> > > > >> > directory,
>> > > > >> > > > and
>> > > > >> > > > > > > >>  background
>> > > > >> > > > > > > >>  >> > threads will move segment from old
>> directory to
>> > > new
>> > > > >> > > > directory.
>> > > > >> > > > > > > >>  >> >
>> > > > >> > > > > > > >>  >> > It is important to note that KIP-112 is
>> > intended
>> > > to
>> > > > >> work
>> > > > >> > > > with
>> > > > >> > > > > > > >>  KIP-113 to
>> > > > >> > > > > > > >>  >> > support JBOD. I think your solution is
>> > definitely
>> > > > >> > simpler
>> > > > >> > > > and
>> > > > >> > > > > > > better
>> > > > >> > > > > > > >>  >> under
>> > > > >> > > > > > > >>  >> > the current kafka implementation that a
>> broker
>> > > will
>> > > > >> fail
>> > > > >> > > if
>> > > > >> > > > > any
>> > > > >> > > > > > > disk
>> > > > >> > > > > > > >>  >> fails.
>> > > > >> > > > > > > >>  >> > But I am not sure if we want to allow
>> broker to
>> > > run
>> > > > >> with
>> > > > >> > > > > partial
>> > > > >> > > > > > > >>  disks
>> > > > >> > > > > > > >>  >> > failure. Let's say the a replica is being
>> moved
>> > > > from
>> > > > >> > > > > log_dir_old
>> > > > >> > > > > > > to
>> > > > >> > > > > > > >>  >> > log_dir_new and then log_dir_old stops
>> working
>> > > due
>> > > > to
>> > > > >> > disk
>> > > > >> > > > > > > failure.
>> > > > >> > > > > > > >>  How
>> > > > >> > > > > > > >>  >> > would your existing patch handles it? To
>> make
>> > the
>> > > > >> > > scenario a
>> > > > >> > > > > bit
>> > > > >> > > > > > > more
>> > > > >> > > > > > > >>  >>
>> > > > >> > > > > > > >>  >> We will lose log_dir_old. After broker
>> restart we
>> > > can
>> > > > >> read
>> > > > >> > > the
>> > > > >> > > > > > data
>> > > > >> > > > > > > >>  from
>> > > > >> > > > > > > >>  >> log_dir_new.
>> > > > >> > > > > > > >>  >
>> > > > >> > > > > > > >>  > No, you probably can't. This is because the
>> broker
>> > > > >> doesn't
>> > > > >> > > have
>> > > > >> > > > > > > *all* the
>> > > > >> > > > > > > >>  > data for this partition. For example, say the
>> > broker
>> > > > has
>> > > > >> > > > > > > >>  > partition_segement_1, partition_segment_50 and
>> > > > >> > > > > > partition_segment_100
>> > > > >> > > > > > > on
>> > > > >> > > > > > > >>  the
>> > > > >> > > > > > > >>  > log_dir_old. partition_segment_100, which has
>> the
>> > > > latest
>> > > > >> > > data,
>> > > > >> > > > > has
>> > > > >> > > > > > > been
>> > > > >> > > > > > > >>  > moved to log_dir_new, and the log_dir_old fails
>> > > before
>> > > > >> > > > > > > >>  partition_segment_50
>> > > > >> > > > > > > >>  > and partition_segment_1 is moved to
>> log_dir_new.
>> > > When
>> > > > >> > broker
>> > > > >> > > > > > > re-starts,
>> > > > >> > > > > > > >>  it
>> > > > >> > > > > > > >>  > won't have partition_segment_50. This causes
>> > problem
>> > > > if
>> > > > >> > > broker
>> > > > >> > > > is
>> > > > >> > > > > > > elected
>> > > > >> > > > > > > >>  > leader and consumer wants to consume data in
>> the
>> > > > >> > > > > > partition_segment_1.
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >>  Right.
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >>  >
>> > > > >> > > > > > > >>  >> > complicated, let's say the broker is
>> shtudown,
>> > > > >> > > log_dir_old's
>> > > > >> > > > > > disk
>> > > > >> > > > > > > >>  fails,
>> > > > >> > > > > > > >>  >> > and the broker starts. In this case broker
>> > > doesn't
>> > > > >> even
>> > > > >> > > know
>> > > > >> > > > > if
>> > > > >> > > > > > > >>  >> log_dir_new
>> > > > >> > > > > > > >>  >> > has all the data needed for this replica. It
>> > > > becomes
>> > > > >> a
>> > > > >> > > > problem
>> > > > >> > > > > > if
>> > > > >> > > > > > > the
>> > > > >> > > > > > > >>  >> > broker is elected leader of this partition
>> in
>> > > this
>> > > > >> case.
>> > > > >> > > > > > > >>  >>
>> > > > >> > > > > > > >>  >> log_dir_new contains the most recent data so
>> we
>> > > will
>> > > > >> lose
>> > > > >> > > the
>> > > > >> > > > > tail
>> > > > >> > > > > > > of
>> > > > >> > > > > > > >>  >> partition.
>> > > > >> > > > > > > >>  >> This is not a big problem for us because we
>> > already
>> > > > >> delete
>> > > > >> > > > tails
>> > > > >> > > > > > by
>> > > > >> > > > > > > >>  hand
>> > > > >> > > > > > > >>  >> (see https://issues.apache.org/jira
>> > > > /browse/KAFKA-1712
>> > > > >> ).
>> > > > >> > > > > > > >>  >> Also we dont use authomatic leader balancing
>> > > > >> > > > > > > >>  (auto.leader.rebalance.enable=false),
>> > > > >> > > > > > > >>  >> so this partition becomes the leader with a
>> low
>> > > > >> > probability.
>> > > > >> > > > > > > >>  >> I think my patch can be modified to prohibit
>> the
>> > > > >> selection
>> > > > >> > > of
>> > > > >> > > > > the
>> > > > >> > > > > > > >>  leader
>> > > > >> > > > > > > >>  >> until the partition does not move completely.
>> > > > >> > > > > > > >>  >
>> > > > >> > > > > > > >>  > I guess you are saying that you have deleted
>> the
>> > > tails
>> > > > >> by
>> > > > >> > > hand
>> > > > >> > > > in
>> > > > >> > > > > > > your
>> > > > >> > > > > > > >>  own
>> > > > >> > > > > > > >>  > kafka branch. But KAFKA-1712 is not accepted
>> into
>> > > > Kafka
>> > > > >> > trunk
>> > > > >> > > > > and I
>> > > > >> > > > > > > am
>> > > > >> > > > > > > >>  not
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >>  No. We just modify segments mtime by cron job.
>> This
>> > > > works
>> > > > >> > with
>> > > > >> > > > > > vanilla
>> > > > >> > > > > > > >>  kafka.
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >>  > sure if it is the right solution. How would
>> this
>> > > > >> solution
>> > > > >> > > > address
>> > > > >> > > > > > the
>> > > > >> > > > > > > >>  > problem mentioned above?
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >>  If you need only fresh data and if you remove old
>> > data
>> > > > by
>> > > > >> > hands
>> > > > >> > > > > this
>> > > > >> > > > > > is
>> > > > >> > > > > > > >>  not a problem. But in general case
>> > > > >> > > > > > > >>  this is a problem of course.
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >>  >
>> > > > >> > > > > > > >>  > BTW, I am not sure the solution mentioned in
>> > > > KAFKA-1712
>> > > > >> is
>> > > > >> > > the
>> > > > >> > > > > > right
>> > > > >> > > > > > > way
>> > > > >> > > > > > > >>  to
>> > > > >> > > > > > > >>  > address its problem. Now that we have
>> timestamp in
>> > > the
>> > > > >> > > message
>> > > > >> > > > we
>> > > > >> > > > > > > can use
>> > > > >> > > > > > > >>  > that to delete old segement instead of relying
>> on
>> > > the
>> > > > >> log
>> > > > >> > > > segment
>> > > > >> > > > > > > mtime.
>> > > > >> > > > > > > >>  > Just some idea and we don't have to discuss
>> this
>> > > > problem
>> > > > >> > > here.
>> > > > >> > > > > > > >>  >
>> > > > >> > > > > > > >>  >> >
>> > > > >> > > > > > > >>  >> > The solution presented in the KIP attempts
>> to
>> > > > handle
>> > > > >> it
>> > > > >> > by
>> > > > >> > > > > > > replacing
>> > > > >> > > > > > > >>  >> > replica in an atomic version fashion after
>> the
>> > > log
>> > > > in
>> > > > >> > the
>> > > > >> > > > new
>> > > > >> > > > > > dir
>> > > > >> > > > > > > has
>> > > > >> > > > > > > >>  >> fully
>> > > > >> > > > > > > >>  >> > caught up with the log in the old dir. At at
>> > time
>> > > > the
>> > > > >> > log
>> > > > >> > > > can
>> > > > >> > > > > be
>> > > > >> > > > > > > >>  >> considered
>> > > > >> > > > > > > >>  >> > to exist on only one log directory.
>> > > > >> > > > > > > >>  >>
>> > > > >> > > > > > > >>  >> As I understand your solution does not cover
>> > > quotas.
>> > > > >> > > > > > > >>  >> What happens if someone starts to transfer 100
>> > > > >> partitions
>> > > > >> > ?
>> > > > >> > > > > > > >>  >
>> > > > >> > > > > > > >>  > Good point. Quota can be implemented in the
>> > future.
>> > > It
>> > > > >> is
>> > > > >> > > > > currently
>> > > > >> > > > > > > >>  > mentioned as as a potential future improvement
>> in
>> > > > >> KIP-112
>> > > > >> > > > > > > >>  > <https://cwiki.apache.org/conf
>> > > > luence/display/KAFKA/KIP-
>> > > > >> > 112%3
>> > > > >> > > > > > > >>  A+Handle+disk+failure+for+JBOD>.Thanks
>> > > > >> > > > > > > >>  > for the reminder. I will move it to KIP-113.
>> > > > >> > > > > > > >>  >
>> > > > >> > > > > > > >>  >> > If yes, it will read a ByteBufferMessageSet
>> > from
>> > > > >> > > > > > > topicPartition.log
>> > > > >> > > > > > > >>  and
>> > > > >> > > > > > > >>  >> append the message set to topicPartition.move
>> > > > >> > > > > > > >>  >>
>> > > > >> > > > > > > >>  >> i.e. processPartitionData will read data from
>> the
>> > > > >> > beginning
>> > > > >> > > of
>> > > > >> > > > > > > >>  >> topicPartition.log? What is the read size?
>> > > > >> > > > > > > >>  >> A ReplicaFetchThread reads many partitions so
>> if
>> > > one
>> > > > >> does
>> > > > >> > > some
>> > > > >> > > > > > > >>  complicated
>> > > > >> > > > > > > >>  >> work (= read a lot of data from disk)
>> everything
>> > > will
>> > > > >> slow
>> > > > >> > > > down.
>> > > > >> > > > > > > >>  >> I think read size should not be very big.
>> > > > >> > > > > > > >>  >>
>> > > > >> > > > > > > >>  >> On the other hand at this point
>> > > > (processPartitionData)
>> > > > >> one
>> > > > >> > > can
>> > > > >> > > > > use
>> > > > >> > > > > > > only
>> > > > >> > > > > > > >>  >> the new data (ByteBufferMessageSet from
>> > parameters)
>> > > > and
>> > > > >> > wait
>> > > > >> > > > > until
>> > > > >> > > > > > > >>  >> (topicPartition.move.smallestOffset <=
>> > > > >> > > > > > > topicPartition.log.smallestOff
>> > > > >> > > > > > > >>  set
>> > > > >> > > > > > > >>  >> && topicPartition.log.largestOffset ==
>> > > > >> > > > > > > topicPartition.log.largestOffs
>> > > > >> > > > > > > >>  et).
>> > > > >> > > > > > > >>  >> In this case the write speed to
>> > topicPartition.move
>> > > > and
>> > > > >> > > > > > > >>  topicPartition.log
>> > > > >> > > > > > > >>  >> will be the same so this will allow us to move
>> > many
>> > > > >> > > partitions
>> > > > >> > > > > to
>> > > > >> > > > > > > one
>> > > > >> > > > > > > >>  disk.
>> > > > >> > > > > > > >>  >
>> > > > >> > > > > > > >>  > The read size of a given partition is
>> configured
>> > > > >> > > > > > > >>  > using replica.fetch.max.bytes, which is the
>> same
>> > > size
>> > > > >> used
>> > > > >> > by
>> > > > >> > > > > > > >>  FetchRequest
>> > > > >> > > > > > > >>  > from follower to leader. If the broker is
>> moving a
>> > > > >> replica
>> > > > >> > > for
>> > > > >> > > > > > which
>> > > > >> > > > > > > it
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >>  OK. Could you mention it in KIP?
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >>  > acts as a follower, the disk write rate for
>> moving
>> > > > this
>> > > > >> > > replica
>> > > > >> > > > > is
>> > > > >> > > > > > at
>> > > > >> > > > > > > >>  most
>> > > > >> > > > > > > >>  > the rate it fetches from leader (assume it is
>> > > catching
>> > > > >> up
>> > > > >> > and
>> > > > >> > > > has
>> > > > >> > > > > > > >>  > sufficient data to read from leader, which is
>> > > subject
>> > > > to
>> > > > >> > > > > > > round-trip-time
>> > > > >> > > > > > > >>  > between itself and the leader. Thus this part
>> if
>> > > > >> probably
>> > > > >> > > fine
>> > > > >> > > > > even
>> > > > >> > > > > > > >>  without
>> > > > >> > > > > > > >>  > quota.
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >>  I think there are 2 problems
>> > > > >> > > > > > > >>  1. Without speed limiter this will not work good
>> > even
>> > > > for
>> > > > >> 1
>> > > > >> > > > > > partition.
>> > > > >> > > > > > > In
>> > > > >> > > > > > > >>  our production we had a problem so we did the
>> > throuput
>> > > > >> > limiter:
>> > > > >> > > > > > > >>  https://github.com/resetius/ka
>> > > > >> fka/commit/cda31dadb2f135743bf
>> > > > >> > > > > > > >>  41083062927886c5ddce1#diff-ffa
>> > > > >> 8861e850121997a534ebdde2929c6R
>> > > > >> > > 713
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >>  2. I dont understand how it will work in case of
>> big
>> > > > >> > > > > > > >>  replica.fetch.wait.max.ms and partition with
>> > > irregular
>> > > > >> flow.
>> > > > >> > > > > > > >>  For example someone could have
>> > > > replica.fetch.wait.max.ms
>> > > > >> > =10mi
>> > > > >> > > > nutes
>> > > > >> > > > > > and
>> > > > >> > > > > > > >>  partition that has very high data flow from
>> 12:00 to
>> > > > 13:00
>> > > > >> > and
>> > > > >> > > > zero
>> > > > >> > > > > > > flow
>> > > > >> > > > > > > >>  otherwise.
>> > > > >> > > > > > > >>  In this case processPartitionData could be called
>> > once
>> > > > per
>> > > > >> > > > > 10minutes
>> > > > >> > > > > > > so if
>> > > > >> > > > > > > >>  we start data moving in 13:01 it will be finished
>> > next
>> > > > >> day.
>> > > > >> > > > > > > >>
>> > > > >> > > > > > > >>  >
>> > > > >> > > > > > > >>  > But ff the broker is moving a replica for
>> which it
>> > > > acts
>> > > > >> as
>> > > > >> > a
>> > > > >> > > > > > leader,
>> > > > >> > > > > > > as
>> > > > >> > > > > > > >>  of
>> > > > >> > > > > > > >>  > current KIP the broker will keep reading from
>> > > > >> log_dir_old
>> > > > >> > and
>> > > > >> > > > > > append
>> > > > >> > > > > > > to
>> > > > >> > > > > > > >>  > log_dir_new without having to wait for
>> > > > round-trip-time.
>> > > > >> We
>> > > > >> > > > > probably
>> > > > >> > > > > > > need
>> > > > >> > > > > > > >>  > quota for this in the future.
>> > > > >> > > > > > > >>  >
>> > > > >> > > > > > > >>  >> >
>> > > > >> > > > > > > >>  >> > And to answer your question, yes
>> > > topicpartition.log
>> > > > >> > refers
>> > > > >> > > > to
>> > > > >> > > > > > > >>  >> > topic-paritition/segment.log.
>> > > > >> > > > > > > >>  >> >
>> > > > >> > > > > > > >>  >> > Thanks,
>> > > > >> > > > > > > >>  >> > Dong
>> > > > >> > > > > > > >>  >> >
>> > > > >> > > > > > > >>  >> > On Fri, Jan 13, 2017 at 4:12 AM, Alexey
>> > > Ozeritsky <
>> > > > >> > > > > > > >>  aozerit...@yandex.ru>
>> > > > >> > > > > > > >>  >> > wrote:
>> > > > >> > > > > > > >>  >> >
>> > > > >> > > > > > > >>  >> >> Hi,
>> > > > >> > > > > > > >>  >> >>
>> > > > >> > > > > > > >>  >> >> We have the similar solution that have been
>> > > > working
>> > > > >> in
>> > > > >> > > > > > production
>> > > > >> > > > > > > >>  since
>> > > > >> > > > > > > >>  >> >> 2014. You can see it here:
>> > > > >> > > https://github.com/resetius/ka
>> > > > >> > > > > > > >>  >> >> fka/commit/20658593e246d218490
>> > > > 6879defa2e763c4d413fb
>> > > > >> > > > > > > >>  >> >> The idea is very simple
>> > > > >> > > > > > > >>  >> >> 1. Disk balancer runs in a separate thread
>> > > inside
>> > > > >> > > scheduler
>> > > > >> > > > > > pool.
>> > > > >> > > > > > > >>  >> >> 2. It does not touch empty partitions
>> > > > >> > > > > > > >>  >> >> 3. Before it moves a partition it forcibly
>> > > creates
>> > > > >> new
>> > > > >> > > > > segment
>> > > > >> > > > > > > on a
>> > > > >> > > > > > > >>  >> >> destination disk
>> > > > >> > > > > > > >>  >> >> 4. It moves segment by segment from new to
>> > old.
>> > > > >> > > > > > > >>  >> >> 5. Log class works with segments on both
>> disks
>> > > > >> > > > > > > >>  >> >>
>> > > > >> > > > > > > >>  >> >> Your approach seems too complicated,
>> moreover
>> > it
>> > > > >> means
>> > > > >> > > that
>> > > > >> > > > > you
>> > > > >> > > > > > > >>  have to
>> > > > >> > > > > > > >>  >> >> patch different components of the system
>> > > > >> > > > > > > >>  >> >> Could you clarify what do you mean by
>> > > > >> > topicPartition.log?
>> > > > >> > > > Is
>> > > > >> > > > > it
>> > > > >> > > > > > > >>  >> >> topic-paritition/segment.log ?
>> > > > >> > > > > > > >>  >> >>
>> > > > >> > > > > > > >>  >> >> 12.01.2017, 21:47, "Dong Lin" <
>> > > > lindon...@gmail.com
>> > > > >> >:
>> > > > >> > > > > > > >>  >> >> > Hi all,
>> > > > >> > > > > > > >>  >> >> >
>> > > > >> > > > > > > >>  >> >> > We created KIP-113: Support replicas
>> > movement
>> > > > >> between
>> > > > >> > > log
>> > > > >> > > > > > > >>  >> directories.
>> > > > >> > > > > > > >>  >> >> > Please find the KIP wiki in the link
>> > > > >> > > > > > > >>  >> >> > *https://cwiki.apache.org/conf
>> > > > >> > > > > luence/display/KAFKA/KIP-113%
>> > > > >> > > > > > > >>  >> >> 3A+Support+replicas+movement+b
>> > > > >> etween+log+directories
>> > > > >> > > > > > > >>  >> >> > <https://cwiki.apache.org/conf
>> > > > >> > > > > luence/display/KAFKA/KIP-113%
>> > > > >> > > > > > > >>  >> >> 3A+Support+replicas+movement+
>> > > > >> > between+log+directories>.*
>> > > > >> > > > > > > >>  >> >> >
>> > > > >> > > > > > > >>  >> >> > This KIP is related to KIP-112
>> > > > >> > > > > > > >>  >> >> > <https://cwiki.apache.org/conf
>> > > > >> > > > > luence/display/KAFKA/KIP-112%
>> > > > >> > > > > > > >>  >> >> 3A+Handle+disk+failure+for+JBOD>:
>> > > > >> > > > > > > >>  >> >> > Handle disk failure for JBOD. They are
>> > needed
>> > > in
>> > > > >> > order
>> > > > >> > > to
>> > > > >> > > > > > > support
>> > > > >> > > > > > > >>  >> JBOD in
>> > > > >> > > > > > > >>  >> >> > Kafka. Please help review the KIP. You
>> > > feedback
>> > > > is
>> > > > >> > > > > > appreciated!
>> > > > >> > > > > > > >>  >> >> >
>> > > > >> > > > > > > >>  >> >> > Thanks,
>> > > > >> > > > > > > >>  >> >> > Dong
>> > > > >> > > > > > >
>> > > > >> > > > > >
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to