Hey ALexey,

Thanks for all the comments!

I have updated the KIP to specify how we enforce quota. I also updated the
"The thread model and broker logic for moving replica data between log
directories" to make it easier to read. You can find the exact change here
<https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=67638408&selectedPageVersions=5&selectedPageVersions=6>.
The idea is to use the same replication quota mechanism introduced in
KIP-73.

Thanks,
Dong



On Wed, Feb 1, 2017 at 2:16 AM, Alexey Ozeritsky <aozerit...@yandex.ru>
wrote:

>
>
> 24.01.2017, 22:03, "Dong Lin" <lindon...@gmail.com>:
> > Hey Alexey,
> >
> > Thanks. I think we agreed that the suggested solution doesn't work in
> > general for kafka users. To answer your questions:
> >
> > 1. I agree we need quota to rate limit replica movement when a broker is
> > moving a "leader" replica. I will come up with solution, probably re-use
> > the config of replication quota introduced in KIP-73.
> >
> > 2. Good point. I agree that this is a problem in general. If is no new
> data
> > on that broker, with current default value of replica.fetch.wait.max.ms
> > and replica.fetch.max.bytes, the replica will be moved at only 2 MBps
> > throughput. I think the solution is for broker to set
> > replica.fetch.wait.max.ms to 0 in its FetchRequest if the corresponding
> > ReplicaFetcherThread needs to move some replica to another disk.
> >
> > 3. I have updated the KIP to mention that the read size of a given
> > partition is configured using replica.fetch.max.bytes when we move
> replicas
> > between disks.
> >
> > Please see this
> > <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?
> pageId=67638408&selectedPageVersions=4&selectedPageVersions=5>
> > for the change of the KIP. I will come up with a solution to throttle
> > replica movement when a broker is moving a "leader" replica.
>
> Thanks. It looks great.
>
> >
> > On Tue, Jan 24, 2017 at 3:30 AM, Alexey Ozeritsky <aozerit...@yandex.ru>
> > wrote:
> >
> >>  23.01.2017, 22:11, "Dong Lin" <lindon...@gmail.com>:
> >>  > Thanks. Please see my comment inline.
> >>  >
> >>  > On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky <
> aozerit...@yandex.ru>
> >>  > wrote:
> >>  >
> >>  >> 13.01.2017, 22:29, "Dong Lin" <lindon...@gmail.com>:
> >>  >> > Hey Alexey,
> >>  >> >
> >>  >> > Thanks for your review and the alternative approach. Here is my
> >>  >> > understanding of your patch. kafka's background threads are used
> to
> >>  move
> >>  >> > data between replicas. When data movement is triggered, the log
> will
> >>  be
> >>  >> > rolled and the new logs will be put in the new directory, and
> >>  background
> >>  >> > threads will move segment from old directory to new directory.
> >>  >> >
> >>  >> > It is important to note that KIP-112 is intended to work with
> >>  KIP-113 to
> >>  >> > support JBOD. I think your solution is definitely simpler and
> better
> >>  >> under
> >>  >> > the current kafka implementation that a broker will fail if any
> disk
> >>  >> fails.
> >>  >> > But I am not sure if we want to allow broker to run with partial
> >>  disks
> >>  >> > failure. Let's say the a replica is being moved from log_dir_old
> to
> >>  >> > log_dir_new and then log_dir_old stops working due to disk
> failure.
> >>  How
> >>  >> > would your existing patch handles it? To make the scenario a bit
> more
> >>  >>
> >>  >> We will lose log_dir_old. After broker restart we can read the data
> >>  from
> >>  >> log_dir_new.
> >>  >
> >>  > No, you probably can't. This is because the broker doesn't have
> *all* the
> >>  > data for this partition. For example, say the broker has
> >>  > partition_segement_1, partition_segment_50 and partition_segment_100
> on
> >>  the
> >>  > log_dir_old. partition_segment_100, which has the latest data, has
> been
> >>  > moved to log_dir_new, and the log_dir_old fails before
> >>  partition_segment_50
> >>  > and partition_segment_1 is moved to log_dir_new. When broker
> re-starts,
> >>  it
> >>  > won't have partition_segment_50. This causes problem if broker is
> elected
> >>  > leader and consumer wants to consume data in the partition_segment_1.
> >>
> >>  Right.
> >>
> >>  >
> >>  >> > complicated, let's say the broker is shtudown, log_dir_old's disk
> >>  fails,
> >>  >> > and the broker starts. In this case broker doesn't even know if
> >>  >> log_dir_new
> >>  >> > has all the data needed for this replica. It becomes a problem if
> the
> >>  >> > broker is elected leader of this partition in this case.
> >>  >>
> >>  >> log_dir_new contains the most recent data so we will lose the tail
> of
> >>  >> partition.
> >>  >> This is not a big problem for us because we already delete tails by
> >>  hand
> >>  >> (see https://issues.apache.org/jira/browse/KAFKA-1712).
> >>  >> Also we dont use authomatic leader balancing
> >>  (auto.leader.rebalance.enable=false),
> >>  >> so this partition becomes the leader with a low probability.
> >>  >> I think my patch can be modified to prohibit the selection of the
> >>  leader
> >>  >> until the partition does not move completely.
> >>  >
> >>  > I guess you are saying that you have deleted the tails by hand in
> your
> >>  own
> >>  > kafka branch. But KAFKA-1712 is not accepted into Kafka trunk and I
> am
> >>  not
> >>
> >>  No. We just modify segments mtime by cron job. This works with vanilla
> >>  kafka.
> >>
> >>  > sure if it is the right solution. How would this solution address the
> >>  > problem mentioned above?
> >>
> >>  If you need only fresh data and if you remove old data by hands this is
> >>  not a problem. But in general case
> >>  this is a problem of course.
> >>
> >>  >
> >>  > BTW, I am not sure the solution mentioned in KAFKA-1712 is the right
> way
> >>  to
> >>  > address its problem. Now that we have timestamp in the message we
> can use
> >>  > that to delete old segement instead of relying on the log segment
> mtime.
> >>  > Just some idea and we don't have to discuss this problem here.
> >>  >
> >>  >> >
> >>  >> > The solution presented in the KIP attempts to handle it by
> replacing
> >>  >> > replica in an atomic version fashion after the log in the new dir
> has
> >>  >> fully
> >>  >> > caught up with the log in the old dir. At at time the log can be
> >>  >> considered
> >>  >> > to exist on only one log directory.
> >>  >>
> >>  >> As I understand your solution does not cover quotas.
> >>  >> What happens if someone starts to transfer 100 partitions ?
> >>  >
> >>  > Good point. Quota can be implemented in the future. It is currently
> >>  > mentioned as as a potential future improvement in KIP-112
> >>  > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3
> >>  A+Handle+disk+failure+for+JBOD>.Thanks
> >>  > for the reminder. I will move it to KIP-113.
> >>  >
> >>  >> > If yes, it will read a ByteBufferMessageSet from
> topicPartition.log
> >>  and
> >>  >> append the message set to topicPartition.move
> >>  >>
> >>  >> i.e. processPartitionData will read data from the beginning of
> >>  >> topicPartition.log? What is the read size?
> >>  >> A ReplicaFetchThread reads many partitions so if one does some
> >>  complicated
> >>  >> work (= read a lot of data from disk) everything will slow down.
> >>  >> I think read size should not be very big.
> >>  >>
> >>  >> On the other hand at this point (processPartitionData) one can use
> only
> >>  >> the new data (ByteBufferMessageSet from parameters) and wait until
> >>  >> (topicPartition.move.smallestOffset <=
> topicPartition.log.smallestOff
> >>  set
> >>  >> && topicPartition.log.largestOffset ==
> topicPartition.log.largestOffs
> >>  et).
> >>  >> In this case the write speed to topicPartition.move and
> >>  topicPartition.log
> >>  >> will be the same so this will allow us to move many partitions to
> one
> >>  disk.
> >>  >
> >>  > The read size of a given partition is configured
> >>  > using replica.fetch.max.bytes, which is the same size used by
> >>  FetchRequest
> >>  > from follower to leader. If the broker is moving a replica for which
> it
> >>
> >>  OK. Could you mention it in KIP?
> >>
> >>  > acts as a follower, the disk write rate for moving this replica is at
> >>  most
> >>  > the rate it fetches from leader (assume it is catching up and has
> >>  > sufficient data to read from leader, which is subject to
> round-trip-time
> >>  > between itself and the leader. Thus this part if probably fine even
> >>  without
> >>  > quota.
> >>
> >>  I think there are 2 problems
> >>  1. Without speed limiter this will not work good even for 1 partition.
> In
> >>  our production we had a problem so we did the throuput limiter:
> >>  https://github.com/resetius/kafka/commit/cda31dadb2f135743bf
> >>  41083062927886c5ddce1#diff-ffa8861e850121997a534ebdde2929c6R713
> >>
> >>  2. I dont understand how it will work in case of big
> >>  replica.fetch.wait.max.ms and partition with irregular flow.
> >>  For example someone could have replica.fetch.wait.max.ms=10minutes and
> >>  partition that has very high data flow from 12:00 to 13:00 and zero
> flow
> >>  otherwise.
> >>  In this case processPartitionData could be called once per 10minutes
> so if
> >>  we start data moving in 13:01 it will be finished next day.
> >>
> >>  >
> >>  > But ff the broker is moving a replica for which it acts as a leader,
> as
> >>  of
> >>  > current KIP the broker will keep reading from log_dir_old and append
> to
> >>  > log_dir_new without having to wait for round-trip-time. We probably
> need
> >>  > quota for this in the future.
> >>  >
> >>  >> >
> >>  >> > And to answer your question, yes topicpartition.log refers to
> >>  >> > topic-paritition/segment.log.
> >>  >> >
> >>  >> > Thanks,
> >>  >> > Dong
> >>  >> >
> >>  >> > On Fri, Jan 13, 2017 at 4:12 AM, Alexey Ozeritsky <
> >>  aozerit...@yandex.ru>
> >>  >> > wrote:
> >>  >> >
> >>  >> >> Hi,
> >>  >> >>
> >>  >> >> We have the similar solution that have been working in production
> >>  since
> >>  >> >> 2014. You can see it here: https://github.com/resetius/ka
> >>  >> >> fka/commit/20658593e246d2184906879defa2e763c4d413fb
> >>  >> >> The idea is very simple
> >>  >> >> 1. Disk balancer runs in a separate thread inside scheduler pool.
> >>  >> >> 2. It does not touch empty partitions
> >>  >> >> 3. Before it moves a partition it forcibly creates new segment
> on a
> >>  >> >> destination disk
> >>  >> >> 4. It moves segment by segment from new to old.
> >>  >> >> 5. Log class works with segments on both disks
> >>  >> >>
> >>  >> >> Your approach seems too complicated, moreover it means that you
> >>  have to
> >>  >> >> patch different components of the system
> >>  >> >> Could you clarify what do you mean by topicPartition.log? Is it
> >>  >> >> topic-paritition/segment.log ?
> >>  >> >>
> >>  >> >> 12.01.2017, 21:47, "Dong Lin" <lindon...@gmail.com>:
> >>  >> >> > Hi all,
> >>  >> >> >
> >>  >> >> > We created KIP-113: Support replicas movement between log
> >>  >> directories.
> >>  >> >> > Please find the KIP wiki in the link
> >>  >> >> > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%
> >>  >> >> 3A+Support+replicas+movement+between+log+directories
> >>  >> >> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%
> >>  >> >> 3A+Support+replicas+movement+between+log+directories>.*
> >>  >> >> >
> >>  >> >> > This KIP is related to KIP-112
> >>  >> >> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%
> >>  >> >> 3A+Handle+disk+failure+for+JBOD>:
> >>  >> >> > Handle disk failure for JBOD. They are needed in order to
> support
> >>  >> JBOD in
> >>  >> >> > Kafka. Please help review the KIP. You feedback is appreciated!
> >>  >> >> >
> >>  >> >> > Thanks,
> >>  >> >> > Dong
>

Reply via email to