[ 
https://issues.apache.org/jira/browse/KAFKA-3064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15083379#comment-15083379
 ] 

Michael Graff commented on KAFKA-3064:
--------------------------------------

We have a 65 GB per partition retention policy.

Example:

TL;DR:  With the current re-sync model, we ended up transferring 6.3 TB of 
useful data in a 38 hour sync, and 15.5 TB of wasted data.  This is much longer 
than the “just calculate how much data we need to transfer” model of 3.2 TB @ 
300 MB/sec = 3 hours.

We ingest about 162 MB/sec across all our topics.  We have, in theory, 400 
MB/sec (“iozone” measured) which we expect to see on transfer speeds, but we 
are seeing less than this. We’re seeing closer to about 300 MB/sec when using 
Kafka to sync.

300 - 162 = 138 MB/sec spare, which when divided across our 48 partitions on 4 
topics, yields 2.875 MB/sec per partition, or 34.5 MB/sec per topic.

“base” ingests at 60 MB/sec, or 5 MB/sec per partition.
“base-text” ingests at 80 MB/sec, or 6.7 MB/sec per partition.
“res” ingests at 10 MB/sec, or 0.8 MB/partition
“res-text” ingests at 12 MB/sec, or 1 MB/partition.

Each topic has 792 GB of data.

At 2.9 MB/sec per partition, only res and res-text make progress.  They each 
progress at this rate:

res: 34.5 - 10 = 24.5 MB/sec progress is made, and T = 792,000 / 24.5 = 32,326 
seconds (8.9 hours).  Res ultimately completes first.  We transferred 10 MB/sec 
of data that is ultimately discarded, which totals 323.3 GB of wasted transfer.

res-text:  34.5 - 12 = 22.5 MB/sec.  Progress is made, and in T seconds 727 GB 
is transferred, leaving 65 GB.  We transferred T * 12 MB/sec = 388 GB of data 
which is ultimately discarded.

base: no progress, but T * 34.5 MB of data is received, and discarded because 
it has already expired off source.  In T seconds, we transferred T * 34.5 
MB/sec = 1115 GB of useless data, as it was all discarded.

base-text: no progress, same calculation.  1115 GB wasted.

Total success:  792 + 727 = 1519 GB synced.
Total wasted:  323 + 388 + 1115 + 1115 = 2941 GB wasted.
Phase 1 time: 32,326 seconds (8.9 hours)

In “phase 2” where we have 1 topic caught up, we divide 138 / 3 = 46 MB/sec of 
per-topic bandwidth.

res-text is our next smallest partition, and will catch up next.

res-text:  46 - 12 = 34 MB/sec.  In T seconds, we transfer the remaining 65 GB 
in 1,912 seconds.  T * 12 = 23 GB is wasted due to expiry.
res:  100% is success, or 19 GB
base: no progress, since ingest > bandwidth.  However, in T seconds, T * 46 = 
88 GB is wasted.
base-text:  same issue, 88 GB wasted.

Phase 2 total success:  65 + 19 = 84 GB (1603 running total)
Phase 2 total loss:  23 + 88 + 88 = 199 GB (3140 running total)
Phase 2 time:  1912 seconds (34,238 seconds, or 9.5 hours)


Phase 3: Now we have two topics, caught up, and have 138 / 2 = 69 MB/sec 
available per topic.

base will be next.

base: 60 - 69 = 9 MB/sec.  to transfer the 792 GB of data, it will take 88,000 
seconds.  During this time, we transfer but discard T * 60 MB, or 5280 GB.
base-text: no progress, as 80 MB/sec < 69 MB/sec available.  T * 69 MB/sec = 
6072 GB.
res: 100% progress.  In T seconds, we transfer T * 10 = 880 GB.
res-text: 100% progress.  In T seconds, we transfer T * 12 = 1056 GB.

Phase 3 total success:  792 + 880 + 1056 = 2728 GB (4331 GB running total)
Phase 3 total loss: 5280 + 6072 = 11280 (14,420 GB running total)
Phase 3 total time: 88,000 seconds (24.4 hours) (running total: 122,328 
seconds, or 34 hours)

Finally, we have all but one topic caught up, so the whole 138 MB/sec goes 
there.

base-text:  138 - 80 = 58 MB/sec.  792 GB / 58 = 13,655 seconds.  T * 80 = 1092 
wasted.
base: 100% success, T * 60 = 819 GB
res: 100% success, T * 10 = 137 GB.
res-text: 100% success, T * 12 = 164 GB.

Phase 4 total success: 792 + 819 + 137 + 164 = 1912 GB (6243 GB running total)
Phase 4 total loss: 1092 GB (15,512 GB running total)
Phase 4 total time: 13,655 seconds (3.8 hours) (135,983 seconds, or 37.8 hours)

This closely matches what we saw in practice.  Note that even over-provisioning 
the server to transfer more than double the data to the other broker still does 
not yield significantly different results as the “big partitions” still 
transfer a lot of data uselessly in the initial phases.

If we had a way to tell the broker to just track the newest data, and only 
declare yourself in sync when you also have the same oldest data as the source, 
we’d have caught up in less than 22 hours and wasted nothing.

With the current re-sync model, we ended up transferring 6.3 TB of useful data 
in these 38 hours, and 15.5 TB of wasted data.

> Improve resync method to waste less time and data transfer
> ----------------------------------------------------------
>
>                 Key: KAFKA-3064
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3064
>             Project: Kafka
>          Issue Type: Improvement
>          Components: controller, network
>            Reporter: Michael Graff
>            Assignee: Neha Narkhede
>
> We have several topics which are large (65 GB per partition) with 12 
> partitions.  Data rates into each topic vary, but in general each one has its 
> own rate.
> After a raid rebuild, we are pulling all the data over to the newly rebuild 
> raid.  This takes forever, and has yet to complete after nearly 8 hours.
> Here are my observations:
> (1)  The Kafka broker seems to pull from all topics on all partitions at the 
> same time, starting at the oldest message.
> (2)  When you divide total disk bandwidth available across all partitions 
> (really, only 48 of which have significant amounts of data, about 65 GB each 
> topic) the ingest rate of 36 out of 48 of them is higher than the available 
> bandwidth.
> (3)  The effect of (2) is that one topic SLOWLY catches up, while the other 4 
> topics continue to retrieve data at 75% of the bandwidth, just to toss it 
> away because the source broker has discarded it already.
> (4)  Eventually that one topic catches up, and the remaining bandwidth is 
> then divided into the remaining 36 partitions, one group of which starts to 
> catch up again.
> What I want to see is a way to say “don’t transfer more than X partitions at 
> the same time” and ideally a priority rule that says, “Transfer partitions 
> you are responsible for first, then transfer ones you are not.  Also, 
> transfer these first, then those, but no more than 1 topic at a time”
> What I REALLY want is for Kafka to track the new data (track the head of the 
> log) and then ask for the tail in chunks.  Ideally this would request from 
> the source, “what is the next logical older starting point?” and then start 
> there.  This way, the transfer basically becomes a file transfer of the log 
> stored on the source’s disk. Once that block is retrieved, it moves on to the 
> next oldest.  This way, there is almost zero waste as both the head and tail 
> grow, but the tail runs the risk of losing the final chunk only.  Thus, 
> bandwidth is not significantly wasted.
> All this changes the ISR check to be is “am I caught up on head AND tail?” 
> when the tail part is implied right now.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to