Hey ALexey, Thanks for all the comments!
I have updated the KIP to specify how we enforce quota. I also updated the "The thread model and broker logic for moving replica data between log directories" to make it easier to read. You can find the exact change here <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=67638408&selectedPageVersions=5&selectedPageVersions=6>. The idea is to use the same replication quota mechanism introduced in KIP-73. Thanks, Dong On Wed, Feb 1, 2017 at 2:16 AM, Alexey Ozeritsky <aozerit...@yandex.ru> wrote: > > > 24.01.2017, 22:03, "Dong Lin" <lindon...@gmail.com>: > > Hey Alexey, > > > > Thanks. I think we agreed that the suggested solution doesn't work in > > general for kafka users. To answer your questions: > > > > 1. I agree we need quota to rate limit replica movement when a broker is > > moving a "leader" replica. I will come up with solution, probably re-use > > the config of replication quota introduced in KIP-73. > > > > 2. Good point. I agree that this is a problem in general. If is no new > data > > on that broker, with current default value of replica.fetch.wait.max.ms > > and replica.fetch.max.bytes, the replica will be moved at only 2 MBps > > throughput. I think the solution is for broker to set > > replica.fetch.wait.max.ms to 0 in its FetchRequest if the corresponding > > ReplicaFetcherThread needs to move some replica to another disk. > > > > 3. I have updated the KIP to mention that the read size of a given > > partition is configured using replica.fetch.max.bytes when we move > replicas > > between disks. > > > > Please see this > > <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action? > pageId=67638408&selectedPageVersions=4&selectedPageVersions=5> > > for the change of the KIP. I will come up with a solution to throttle > > replica movement when a broker is moving a "leader" replica. > > Thanks. It looks great. > > > > > On Tue, Jan 24, 2017 at 3:30 AM, Alexey Ozeritsky <aozerit...@yandex.ru> > > wrote: > > > >> 23.01.2017, 22:11, "Dong Lin" <lindon...@gmail.com>: > >> > Thanks. Please see my comment inline. > >> > > >> > On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky < > aozerit...@yandex.ru> > >> > wrote: > >> > > >> >> 13.01.2017, 22:29, "Dong Lin" <lindon...@gmail.com>: > >> >> > Hey Alexey, > >> >> > > >> >> > Thanks for your review and the alternative approach. Here is my > >> >> > understanding of your patch. kafka's background threads are used > to > >> move > >> >> > data between replicas. When data movement is triggered, the log > will > >> be > >> >> > rolled and the new logs will be put in the new directory, and > >> background > >> >> > threads will move segment from old directory to new directory. > >> >> > > >> >> > It is important to note that KIP-112 is intended to work with > >> KIP-113 to > >> >> > support JBOD. I think your solution is definitely simpler and > better > >> >> under > >> >> > the current kafka implementation that a broker will fail if any > disk > >> >> fails. > >> >> > But I am not sure if we want to allow broker to run with partial > >> disks > >> >> > failure. Let's say the a replica is being moved from log_dir_old > to > >> >> > log_dir_new and then log_dir_old stops working due to disk > failure. > >> How > >> >> > would your existing patch handles it? To make the scenario a bit > more > >> >> > >> >> We will lose log_dir_old. After broker restart we can read the data > >> from > >> >> log_dir_new. > >> > > >> > No, you probably can't. This is because the broker doesn't have > *all* the > >> > data for this partition. For example, say the broker has > >> > partition_segement_1, partition_segment_50 and partition_segment_100 > on > >> the > >> > log_dir_old. partition_segment_100, which has the latest data, has > been > >> > moved to log_dir_new, and the log_dir_old fails before > >> partition_segment_50 > >> > and partition_segment_1 is moved to log_dir_new. When broker > re-starts, > >> it > >> > won't have partition_segment_50. This causes problem if broker is > elected > >> > leader and consumer wants to consume data in the partition_segment_1. > >> > >> Right. > >> > >> > > >> >> > complicated, let's say the broker is shtudown, log_dir_old's disk > >> fails, > >> >> > and the broker starts. In this case broker doesn't even know if > >> >> log_dir_new > >> >> > has all the data needed for this replica. It becomes a problem if > the > >> >> > broker is elected leader of this partition in this case. > >> >> > >> >> log_dir_new contains the most recent data so we will lose the tail > of > >> >> partition. > >> >> This is not a big problem for us because we already delete tails by > >> hand > >> >> (see https://issues.apache.org/jira/browse/KAFKA-1712). > >> >> Also we dont use authomatic leader balancing > >> (auto.leader.rebalance.enable=false), > >> >> so this partition becomes the leader with a low probability. > >> >> I think my patch can be modified to prohibit the selection of the > >> leader > >> >> until the partition does not move completely. > >> > > >> > I guess you are saying that you have deleted the tails by hand in > your > >> own > >> > kafka branch. But KAFKA-1712 is not accepted into Kafka trunk and I > am > >> not > >> > >> No. We just modify segments mtime by cron job. This works with vanilla > >> kafka. > >> > >> > sure if it is the right solution. How would this solution address the > >> > problem mentioned above? > >> > >> If you need only fresh data and if you remove old data by hands this is > >> not a problem. But in general case > >> this is a problem of course. > >> > >> > > >> > BTW, I am not sure the solution mentioned in KAFKA-1712 is the right > way > >> to > >> > address its problem. Now that we have timestamp in the message we > can use > >> > that to delete old segement instead of relying on the log segment > mtime. > >> > Just some idea and we don't have to discuss this problem here. > >> > > >> >> > > >> >> > The solution presented in the KIP attempts to handle it by > replacing > >> >> > replica in an atomic version fashion after the log in the new dir > has > >> >> fully > >> >> > caught up with the log in the old dir. At at time the log can be > >> >> considered > >> >> > to exist on only one log directory. > >> >> > >> >> As I understand your solution does not cover quotas. > >> >> What happens if someone starts to transfer 100 partitions ? > >> > > >> > Good point. Quota can be implemented in the future. It is currently > >> > mentioned as as a potential future improvement in KIP-112 > >> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3 > >> A+Handle+disk+failure+for+JBOD>.Thanks > >> > for the reminder. I will move it to KIP-113. > >> > > >> >> > If yes, it will read a ByteBufferMessageSet from > topicPartition.log > >> and > >> >> append the message set to topicPartition.move > >> >> > >> >> i.e. processPartitionData will read data from the beginning of > >> >> topicPartition.log? What is the read size? > >> >> A ReplicaFetchThread reads many partitions so if one does some > >> complicated > >> >> work (= read a lot of data from disk) everything will slow down. > >> >> I think read size should not be very big. > >> >> > >> >> On the other hand at this point (processPartitionData) one can use > only > >> >> the new data (ByteBufferMessageSet from parameters) and wait until > >> >> (topicPartition.move.smallestOffset <= > topicPartition.log.smallestOff > >> set > >> >> && topicPartition.log.largestOffset == > topicPartition.log.largestOffs > >> et). > >> >> In this case the write speed to topicPartition.move and > >> topicPartition.log > >> >> will be the same so this will allow us to move many partitions to > one > >> disk. > >> > > >> > The read size of a given partition is configured > >> > using replica.fetch.max.bytes, which is the same size used by > >> FetchRequest > >> > from follower to leader. If the broker is moving a replica for which > it > >> > >> OK. Could you mention it in KIP? > >> > >> > acts as a follower, the disk write rate for moving this replica is at > >> most > >> > the rate it fetches from leader (assume it is catching up and has > >> > sufficient data to read from leader, which is subject to > round-trip-time > >> > between itself and the leader. Thus this part if probably fine even > >> without > >> > quota. > >> > >> I think there are 2 problems > >> 1. Without speed limiter this will not work good even for 1 partition. > In > >> our production we had a problem so we did the throuput limiter: > >> https://github.com/resetius/kafka/commit/cda31dadb2f135743bf > >> 41083062927886c5ddce1#diff-ffa8861e850121997a534ebdde2929c6R713 > >> > >> 2. I dont understand how it will work in case of big > >> replica.fetch.wait.max.ms and partition with irregular flow. > >> For example someone could have replica.fetch.wait.max.ms=10minutes and > >> partition that has very high data flow from 12:00 to 13:00 and zero > flow > >> otherwise. > >> In this case processPartitionData could be called once per 10minutes > so if > >> we start data moving in 13:01 it will be finished next day. > >> > >> > > >> > But ff the broker is moving a replica for which it acts as a leader, > as > >> of > >> > current KIP the broker will keep reading from log_dir_old and append > to > >> > log_dir_new without having to wait for round-trip-time. We probably > need > >> > quota for this in the future. > >> > > >> >> > > >> >> > And to answer your question, yes topicpartition.log refers to > >> >> > topic-paritition/segment.log. > >> >> > > >> >> > Thanks, > >> >> > Dong > >> >> > > >> >> > On Fri, Jan 13, 2017 at 4:12 AM, Alexey Ozeritsky < > >> aozerit...@yandex.ru> > >> >> > wrote: > >> >> > > >> >> >> Hi, > >> >> >> > >> >> >> We have the similar solution that have been working in production > >> since > >> >> >> 2014. You can see it here: https://github.com/resetius/ka > >> >> >> fka/commit/20658593e246d2184906879defa2e763c4d413fb > >> >> >> The idea is very simple > >> >> >> 1. Disk balancer runs in a separate thread inside scheduler pool. > >> >> >> 2. It does not touch empty partitions > >> >> >> 3. Before it moves a partition it forcibly creates new segment > on a > >> >> >> destination disk > >> >> >> 4. It moves segment by segment from new to old. > >> >> >> 5. Log class works with segments on both disks > >> >> >> > >> >> >> Your approach seems too complicated, moreover it means that you > >> have to > >> >> >> patch different components of the system > >> >> >> Could you clarify what do you mean by topicPartition.log? Is it > >> >> >> topic-paritition/segment.log ? > >> >> >> > >> >> >> 12.01.2017, 21:47, "Dong Lin" <lindon...@gmail.com>: > >> >> >> > Hi all, > >> >> >> > > >> >> >> > We created KIP-113: Support replicas movement between log > >> >> directories. > >> >> >> > Please find the KIP wiki in the link > >> >> >> > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-113% > >> >> >> 3A+Support+replicas+movement+between+log+directories > >> >> >> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-113% > >> >> >> 3A+Support+replicas+movement+between+log+directories>.* > >> >> >> > > >> >> >> > This KIP is related to KIP-112 > >> >> >> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-112% > >> >> >> 3A+Handle+disk+failure+for+JBOD>: > >> >> >> > Handle disk failure for JBOD. They are needed in order to > support > >> >> JBOD in > >> >> >> > Kafka. Please help review the KIP. You feedback is appreciated! > >> >> >> > > >> >> >> > Thanks, > >> >> >> > Dong >