Kyle Ambroff created KAFKA-5297:
-----------------------------------

             Summary: Broker can take a long time to shut down if there are 
many active log segments
                 Key: KAFKA-5297
                 URL: https://issues.apache.org/jira/browse/KAFKA-5297
             Project: Kafka
          Issue Type: Improvement
            Reporter: Kyle Ambroff
            Priority: Minor
         Attachments: shutdown-flame-graph.png

After the changes for KIP-33 were merged, we started noticing that our cluster 
restart times were quite a bit longer. In some cases it was taking four times 
as long as expected to do a rolling restart of every broker in the cluster. 
This meant that doing a deploy to one of our Kafka clusters went from taking 
about 3 hours to more than 12 hours!

We looked into this and we have some data from a couple of runs with a sampling 
profiler. It turns out that it isn't unusual for us to have a broker sit in 
kafka.log.Log#close for up to 30 minutes if it has been running for several 
weeks. There are just so many active log segments that it just takes a long 
time to truncate all of the indexes.

I've attached a flame graph that was generated from 10 minutes of stack samples 
collected during shutdown of a broker that took about 30 minutes total to shut 
down cleanly.

* About 60% of the time was spent in kafka.log.AbstractIndex#resize, where 
every index and timeindex file is truncated to the size of the number of 
entries in that index.
* Another big chunk of time is spent reading the last entry from the index, 
which is used to make any final updates to the timeindex file. This is 
something that can be cached. For a broker that's been running for a long time 
the bulk of these indexes are not likely to be in the page cache anymore. We 
cache the largestTimestamp and offsetOfLargestTimestamp in LogSegment, so we 
could add a cache for this as well.

Looking at these changes and considering KIP-33, it isn't surprising that the 
broker shutdown time has increased so dramatically. The extra index plus the 
extra reads have increased the amount of work performed by kafka.log.Log#close 
by about 4x (in terms of system calls and potential page faults). Breaking down 
what this function does:

# Read the max timestamp from the timeindex. Could lead to a disk seek.
# Read the max offset from the index. Could lead to a disk seek.
# Append the timestamp and offset of the most recently written message to the 
timeindex if it hasn't been written there for some reason.
# Truncate the index file
## Get the position in the index of the last entry written
## If on Windows then unmap and close the index
## reopen
## truncate to the number of entries * entry size. (ftruncate() system call)
## mmap()
## Set the position back to where it was before the original. Leads to lseek() 
system call.
## Close the newly reopenned and mapped index
# Same thing as #4 but for the timeindex.
## Get the position in the timeindex of the last entry written
## If on Windows then unmap and close the timeindex
## reopen
## truncate to the number of entries * entry size. (ftruncate() system call)
## mmap()
## Set the position back to where it was before the original. Leads to lseek() 
system call.
## Close the newly reopenned and mapped timeindex
# Finalize the log segment
## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for 
that log segment.
## Truncate the log segment if it doesn't have enough messages written to fill 
up the whole thing. Potentially leads to a ftruncate() system call.
## Set the position to the end of the segment after truncation. Leads to a 
lseek() system call.
## Close and unmap the channel.

Looking in to the current implementation of kafka.log.AbstractIndex#resize, it 
appears to do quite a bit of extra work to avoid keeping an instance of 
RandomAccessFile around. It has to reopen the file, truncate, mmap(), 
potentially perform an additional disk seek, all before imediately closing the 
file.

You wouldn't think this would amount to much, but I put together a benchmark 
using jmh to measure the difference between the current code and a new 
implementation that didn't have to recreate the page mapping during resize(), 
and the difference is pretty dramatic.

{noformat}
Result "currentImplementation":
  2063.386 ±(99.9%) 81.758 ops/s [Average]
  (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863
  CI (99.9%): [1981.628, 2145.144] (assumes normal distribution)

Result "optimizedImplementation":
  3497.354 ±(99.9%) 31.575 ops/s [Average]
  (min, avg, max) = (3261.232, 3497.354, 3605.527), stdev = 70.623
  CI (99.9%): [3465.778, 3528.929] (assumes normal distribution)

# Run complete. Total time: 00:03:37

Benchmark                                     Mode  Cnt     Score    Error  
Units
LogSegmentBenchmark.currentImplementation    thrpt   60  2063.386 ± 81.758  
ops/s
LogSegmentBenchmark.optimizedImplementation  thrpt   60  3497.354 ± 31.575  
ops/s
{noformat}

I ran this benchmark on a Linux workstation. It just measures the throughput of 
Log#close after 20 segments have been created. Not having to reopen the file 
amounts to a 70% increase in throughput.

I think there are two totally valid approaches to making this better:

* Premptively truncate index files when log rotation happens. Once a log is 
rotated, jobs could be added to an ExecutorService which truncates indexes so 
that they don't all have to be truncated on shutdown. The new shutdown code 
would enqueue all remaining active indexes and then drain the queue.
* Alternatively we could just add a RandomAccessFile instance variable to 
AbstractIndex so that it doesn't have to recreate the page mapping on resize(). 
This means an extra file handle for each segment but that doesn't seem like a 
big deal to me.

No matter what we should add a cache for kafka.log.OffsetIndex#lastEntry.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to